summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-10-12 20:01:11 +0300
committerMichael Klishin <michael@novemberain.com>2015-10-12 20:01:11 +0300
commit44a0ddb72dc0337235bcecf63878658bac4288a4 (patch)
tree0754e646b74ac3290836fe391f446e98904e64f3 /src
parent63cc2bb9391a34901de50b092f59045b1daf9289 (diff)
parentdc72935607e5bf6c563556139ea6992899b8520a (diff)
downloadrabbitmq-server-git-44a0ddb72dc0337235bcecf63878658bac4288a4.tar.gz
Merge pull request #344 from rabbitmq/rabbitmq-server-336
Implements Mirror Queue Sync in Batches
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl17
-rw-r--r--src/rabbit_mirror_queue_master.erl45
-rw-r--r--src/rabbit_mirror_queue_misc.erl75
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
-rw-r--r--src/rabbit_mirror_queue_sync.erl169
-rw-r--r--src/rabbit_priority_queue.erl58
-rw-r--r--src/rabbit_variable_queue.erl132
7 files changed, 407 insertions, 116 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a03bda13c9..2b808e206c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,6 +33,10 @@
-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
+-type(publish() :: {rabbit_types:basic_message(),
+ rabbit_types:message_properties(), boolean()}).
+-type(delivered_publish() :: {rabbit_types:basic_message(),
+ rabbit_types:message_properties()}).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
@@ -104,6 +108,9 @@
rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().
+%% Like publish/6 but for batches of publishes.
+-callback batch_publish([publish()], pid(), flow(), state()) -> state().
+
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
@@ -112,6 +119,11 @@
state())
-> {ack(), state()}.
+%% Like publish_delivered/5 but for batches of publishes.
+-callback batch_publish_delivered([delivered_publish()], pid(), flow(),
+ state())
+ -> {[ack()], state()}.
+
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
@@ -253,8 +265,9 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1},
- {purge_acks, 1}, {publish, 6},
- {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
+ {purge_acks, 1}, {publish, 6}, {publish_delivered, 5},
+ {batch_publish, 4}, {batch_publish_delivered, 4},
+ {discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4}, {fetch, 2},
{drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 7890128872..ee3a097a80 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,6 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, purge_acks/1, publish/6, publish_delivered/5,
+ batch_publish/4, batch_publish_delivered/4,
discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -147,13 +148,15 @@ sync_mirrors(HandleInfo, EmitStats,
QName, "Synchronising: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ {ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName),
+ SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
+ Log("batch size: ~p", [SyncBatchSize]),
Ref = make_ref(),
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
- Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
+ Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
@@ -241,6 +244,27 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
+batch_publish(Publishes, ChPid, Flow,
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Publishes1, false, MsgSizes} =
+ lists:foldl(fun ({Msg = #basic_message { id = MsgId },
+ MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
+ {[{Msg, MsgProps, true} | Pubs], %% [0]
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ Sizes + rabbit_basic:msg_size(Msg)}
+ end, {[], false, 0}, Publishes),
+ Publishes2 = lists:reverse(Publishes1),
+ ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2},
+ MsgSizes),
+ BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS),
+ ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
+%% [0] When the slave process handles the publish command, it sets the
+%% IsDelivered flag to true, so to avoid iterating over the messages
+%% again at the slave, we do it here.
+
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, Flow, State = #state { gm = GM,
seen_status = SS,
@@ -253,6 +277,23 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
+batch_publish_delivered(Publishes, ChPid, Flow,
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {false, MsgSizes} =
+ lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
+ {false, Sizes}) ->
+ {false = dict:is_key(MsgId, SS), %% ASSERTION
+ Sizes + rabbit_basic:msg_size(Msg)}
+ end, {false, 0}, Publishes),
+ ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
+ MsgSizes),
+ {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {AckTags, ensure_monitoring(ChPid, State1)}.
+
discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index fee890476e..b8997faea5 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,7 +22,7 @@
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
- log_info/3, log_warning/3]).
+ sync_batch_size/1, log_info/3, log_warning/3]).
%% for testing only
-export([module/1]).
@@ -39,10 +39,13 @@
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -332,6 +335,14 @@ module(Mode) when is_binary(Mode) ->
end
end.
+validate_mode(Mode) ->
+ case module(Mode) of
+ {ok, _Module} ->
+ ok;
+ not_mirrored ->
+ {error, "~p is not a valid ha-mode value", [Mode]}
+ end.
+
is_mirrored(Q) ->
case module(Q) of
{ok, _} -> true;
@@ -355,6 +366,22 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
ok
end.
+sync_batch_size(#amqqueue{} = Q) ->
+ case policy(<<"ha-sync-batch-size">>, Q) of
+ none -> %% we need this case because none > 1 == true
+ default_batch_size();
+ BatchSize when BatchSize > 1 ->
+ BatchSize;
+ _ ->
+ default_batch_size()
+ end.
+
+-define(DEFAULT_BATCH_SIZE, 4096).
+
+default_batch_size() ->
+ rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
+ ?DEFAULT_BATCH_SIZE).
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
@@ -410,25 +437,37 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
+ SyncBatchSize = proplists:get_value(
+ <<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
- case {Mode, Params, SyncMode, PromoteOnShutdown} of
- {none, none, none, none} ->
+ case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
+ {none, none, none, none, none} ->
ok;
- {none, _, _, _} ->
+ {none, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
- case module(Mode) of
- {ok, M} -> case M:validate_policy(Params) of
- ok -> case validate_sync_mode(SyncMode) of
- ok -> validate_pos(PromoteOnShutdown);
- E -> E
- end;
- E -> E
- end;
- _ -> {error, "~p is not a valid ha-mode value", [Mode]}
- end
+ validate_policies(
+ [{Mode, fun validate_mode/1},
+ {Params, ha_params_validator(Mode)},
+ {SyncMode, fun validate_sync_mode/1},
+ {SyncBatchSize, fun validate_sync_batch_size/1},
+ {PromoteOnShutdown, fun validate_pos/1}])
+ end.
+
+ha_params_validator(Mode) ->
+ fun(Val) ->
+ {ok, M} = module(Mode),
+ M:validate_policy(Val)
+ end.
+
+validate_policies([]) ->
+ ok;
+validate_policies([{Val, Validator} | Rest]) ->
+ case Validator(Val) of
+ ok -> validate_policies(Rest);
+ E -> E
end.
validate_sync_mode(SyncMode) ->
@@ -440,6 +479,14 @@ validate_sync_mode(SyncMode) ->
"or \"automatic\", got ~p", [Mode]}
end.
+validate_sync_batch_size(none) ->
+ ok;
+validate_sync_batch_size(N) when is_integer(N) andalso N > 0 ->
+ ok;
+validate_sync_batch_size(N) ->
+ {error, "ha-sync-batch-size takes an integer greather than 0, "
+ "~p given", [N]}.
+
validate_pos(PromoteOnShutdown) ->
case PromoteOnShutdown of
<<"always">> -> ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7f309ab0b7..5da91c70c5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -851,6 +851,15 @@ process_instruction({publish, ChPid, Flow, MsgProps,
publish_or_discard(published, ChPid, MsgId, State),
BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({batch_publish, ChPid, Flow, Publishes}, State) ->
+ maybe_flow_ack(ChPid, Flow),
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ lists:foldl(fun ({#basic_message { id = MsgId },
+ _MsgProps, _IsDelivered}, St) ->
+ publish_or_discard(published, ChPid, MsgId, St)
+ end, State, Publishes),
+ BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
maybe_flow_ack(ChPid, Flow),
@@ -860,6 +869,24 @@ process_instruction({publish_delivered, ChPid, Flow, MsgProps,
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
+process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
+ maybe_flow_ack(ChPid, Flow),
+ {MsgIds,
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} =
+ lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps},
+ {MsgIds, St}) ->
+ {[MsgId | MsgIds],
+ publish_or_discard(published, ChPid, MsgId, St)}
+ end, {[], State}, Publishes),
+ true = BQ:is_empty(BQS),
+ {AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
+ MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags),
+ State2 = lists:foldl(
+ fun ({MsgId, AckTag}, St) ->
+ maybe_store_ack(true, MsgId, AckTag, St)
+ end, State1 #state { backing_queue_state = BQS1 },
+ MsgIdsAndAcks),
+ {ok, State2};
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index b76422ee6b..534ef1afad 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/7, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -45,7 +45,7 @@
%% || <--- ready ---- || ||
%% || <--- next* ---- || || }
%% || ---- msg* ----> || || } loop
-%% || || ---- sync_msg* ----> || }
+%% || || ---- sync_msgs* ---> || }
%% || || <--- (credit)* ----- || }
%% || <--- next ---- || ||
%% || ---- done ----> || ||
@@ -63,9 +63,10 @@
-spec(master_prepare/4 :: (reference(), rabbit_amqqueue:name(),
log_fun(), [pid()]) -> pid()).
--spec(master_go/7 :: (pid(), reference(), log_fun(),
+-spec(master_go/8 :: (pid(), reference(), log_fun(),
rabbit_mirror_queue_master:stats_fun(),
rabbit_mirror_queue_master:stats_fun(),
+ non_neg_integer(),
bq(), bqs()) ->
{'already_synced', bqs()} | {'ok', bqs()} |
{'shutdown', any(), bqs()} |
@@ -88,48 +89,65 @@ master_prepare(Ref, QName, Log, SPids) ->
syncer(Ref, Log, MPid, SPids)
end).
-master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
+master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
Args = {Syncer, Ref, Log, HandleInfo, EmitStats, rabbit_misc:get_parent()},
receive
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
{ready, Syncer} -> EmitStats({syncing, 0}),
- master_go0(Args, BQ, BQS)
+ master_batch_go0(Args, SyncBatchSize,
+ BQ, BQS)
end.
-master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
- master_send(Msg, MsgProps, Unacked, Args, Acc)
- end, {0, time_compat:monotonic_time()}, BQS) of
+master_batch_go0(Args, BatchSize, BQ, BQS) ->
+ FoldFun =
+ fun (Msg, MsgProps, Unacked, Acc) ->
+ Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc),
+ case maybe_master_batch_send(Acc1, BatchSize) of
+ true -> master_batch_send(Args, Acc1);
+ false -> {cont, Acc1}
+ end
+ end,
+ FoldAcc = {[], 0, {0, BQ:depth(BQS)}, time_compat:monotonic_time()},
+ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
+
+master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
+ {Batch, I, {Curr, Len}, Last}) ->
+ T = maybe_emit_stats(Last, I, EmitStats, Log),
+ HandleInfo({syncing, I}),
+ handle_set_maximum_since_use(),
+ SyncMsg = {msgs, Ref, lists:reverse(Batch)},
+ NewAcc = {[], I + length(Batch), {Curr, Len}, T},
+ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
+
+%% Either send messages when we reach the last one in the queue or
+%% whenever we have accumulated BatchSize messages.
+maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) ->
+ true;
+maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize)
+ when Curr rem BatchSize =:= 0 ->
+ true;
+maybe_master_batch_send(_Acc, _BatchSize) ->
+ false.
+
+bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
+ case BQ:fold(FoldFun, FoldAcc, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, Unacked,
- {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
- Interval = time_compat:convert_time_unit(
- time_compat:monotonic_time() - Last, native, micro_seconds),
- T = case Interval > ?SYNC_PROGRESS_INTERVAL of
- true -> EmitStats({syncing, I}),
- Log("~p messages", [I]),
- time_compat:monotonic_time();
- false -> Last
- end,
- HandleInfo({syncing, I}),
- receive
- {'$gen_cast', {set_maximum_since_use, Age}} ->
- ok = file_handle_cache:set_maximum_since_use(Age)
- after 0 ->
- ok
- end,
+append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
+ {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
+
+master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
receive
{'$gen_call', From,
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
gen_server2:reply(From, ok),
{stop, cancelled};
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
- {cont, {I + 1, T}};
+ {next, Ref} -> Syncer ! SyncMsg,
+ {cont, NewAcc};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
@@ -149,6 +167,24 @@ stop_syncer(Syncer, Msg) ->
after 0 -> ok
end.
+maybe_emit_stats(Last, I, EmitStats, Log) ->
+ Interval = time_compat:convert_time_unit(
+ time_compat:monotonic_time() - Last, native, micro_seconds),
+ case Interval > ?SYNC_PROGRESS_INTERVAL of
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
+ time_compat:monotonic_time();
+ false -> Last
+ end.
+
+handle_set_maximum_since_use() ->
+ receive
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age)
+ after 0 ->
+ ok
+ end.
+
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -184,12 +220,9 @@ await_slaves(Ref, SPids) ->
syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
- {msg, Ref, Msg, MsgProps, Unacked} ->
+ {msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
- [begin
- credit_flow:send(SPid),
- SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
- end || SPid <- SPids1],
+ broadcast(SPids1, {sync_msgs, Ref, Msgs}),
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
@@ -200,6 +233,12 @@ syncer_loop(Ref, MPid, SPids) ->
[SPid ! {sync_complete, Ref} || SPid <- SPids]
end.
+broadcast(SPids, Msg) ->
+ [begin
+ credit_flow:send(SPid),
+ SPid ! Msg
+ end || SPid <- SPids].
+
wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
@@ -260,17 +299,9 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
slave_sync_loop(Args, {MA, TRef1, BQS1});
- {sync_msg, Ref, Msg, Props, Unacked} ->
+ {sync_msgs, Ref, Batch} ->
credit_flow:ack(Syncer),
- Props1 = Props#message_properties{needs_confirming = false},
- {MA1, BQS1} =
- case Unacked of
- false -> {MA,
- BQ:publish(Msg, Props1, true, none, noflow, BQS)};
- true -> {AckTag, BQS2} = BQ:publish_delivered(
- Msg, Props1, none, noflow, BQS),
- {[{Msg#basic_message.id, AckTag} | MA], BQS2}
- end,
+ {MA1, BQS1} = process_batch(Batch, MA, BQ, BQS),
slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
{stop, Reason, State};
@@ -279,3 +310,55 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
BQ:delete_and_terminate(Reason, BQS),
{stop, Reason, {[], TRef, undefined}}
end.
+
+%% We are partitioning messages by the Unacked element in the tuple.
+%% when unacked = true, then it's a publish_delivered message,
+%% otherwise it's a publish message.
+%%
+%% Note that we can't first partition the batch and then publish each
+%% part, since that would result in re-ordering messages, which we
+%% don't want to do.
+process_batch([], MA, _BQ, BQS) ->
+ {MA, BQS};
+process_batch(Batch, MA, BQ, BQS) ->
+ {_Msg, _MsgProps, Unacked} = hd(Batch),
+ process_batch(Batch, Unacked, [], MA, BQ, BQS).
+
+process_batch([{Msg, Props, true = Unacked} | Rest], true = Unacked,
+ Acc, MA, BQ, BQS) ->
+ %% publish_delivered messages don't need the IsDelivered flag,
+ %% therefore we just add {Msg, Props} to the accumulator.
+ process_batch(Rest, Unacked, [{Msg, props(Props)} | Acc],
+ MA, BQ, BQS);
+process_batch([{Msg, Props, false = Unacked} | Rest], false = Unacked,
+ Acc, MA, BQ, BQS) ->
+ %% publish messages needs the IsDelivered flag which is set to true
+ %% here.
+ process_batch(Rest, Unacked, [{Msg, props(Props), true} | Acc],
+ MA, BQ, BQS);
+process_batch(Batch, Unacked, Acc, MA, BQ, BQS) ->
+ {MA1, BQS1} = publish_batch(Unacked, lists:reverse(Acc), MA, BQ, BQS),
+ process_batch(Batch, MA1, BQ, BQS1).
+
+%% Unacked msgs are published via batch_publish.
+publish_batch(false, Batch, MA, BQ, BQS) ->
+ batch_publish(Batch, MA, BQ, BQS);
+%% Acked msgs are published via batch_publish_delivered.
+publish_batch(true, Batch, MA, BQ, BQS) ->
+ batch_publish_delivered(Batch, MA, BQ, BQS).
+
+
+batch_publish(Batch, MA, BQ, BQS) ->
+ BQS1 = BQ:batch_publish(Batch, none, noflow, BQS),
+ {MA, BQS1}.
+
+batch_publish_delivered(Batch, MA, BQ, BQS) ->
+ {AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS),
+ MA1 =
+ lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
+ [{Msg#basic_message.id, AckTag} | MAs]
+ end, MA, lists:zip(Batch, AckTags)),
+ {MA1, BQS1}.
+
+props(Props) ->
+ Props#message_properties{needs_confirming = false}.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index a839badfc4..28cee163a1 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -35,6 +35,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
+ batch_publish/4, batch_publish_delivered/4,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -203,6 +204,19 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
+batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
+ PubDict = publishes_by_priority(
+ Publishes, fun ({Msg, _, _}) -> Msg end),
+ lists:foldl(
+ fun ({Priority, Pubs}, St) ->
+ pick1(fun (_P, BQSN) ->
+ BQ:batch_publish(Pubs, ChPid, Flow, BQSN)
+ end, Priority, St)
+ end, State, orddict:to_list(PubDict));
+batch_publish(Publishes, ChPid, Flow,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)).
+
publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) ->
pick2(fun (P, BQSN) ->
{AckTag, BQSN1} = BQ:publish_delivered(
@@ -213,6 +227,26 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
+batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
+ PubDict = publishes_by_priority(
+ Publishes, fun ({Msg, _}) -> Msg end),
+ {PrioritiesAndAcks, State1} =
+ lists:foldl(
+ fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
+ {PriosAndAcks1, St1} =
+ pick2(fun (P, BQSN) ->
+ {AckTags, BQSN1} =
+ BQ:batch_publish_delivered(
+ Pubs, ChPid, Flow, BQSN),
+ {{P, AckTags}, BQSN1}
+ end, Priority, St),
+ {[PriosAndAcks1 | PriosAndAcks], St1}
+ end, {[], State}, orddict:to_list(PubDict)),
+ {lists:reverse(PrioritiesAndAcks), State1};
+batch_publish_delivered(Publishes, ChPid, Flow,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)).
+
%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
@@ -532,6 +566,11 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
+publishes_by_priority(Publishes, ExtractMsg) ->
+ lists:foldl(fun (Pub, Dict) ->
+ Msg = ExtractMsg(Pub),
+ rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
+ end, orddict:new(), Publishes).
priority(P, BQSs) when is_integer(P) ->
{P, bq_fetch(P, BQSs)};
@@ -540,18 +579,21 @@ priority(#basic_message{content = Content}, BQSs) ->
priority1(_Content, [{P, BQSN}]) ->
{P, BQSN};
-priority1(Content = #content{properties = Props},
- [{P, BQSN} | Rest]) ->
- #'P_basic'{priority = Priority0} = Props,
- Priority = case Priority0 of
- undefined -> 0;
- _ when is_integer(Priority0) -> Priority0
- end,
- case Priority >= P of
+priority1(Content, [{P, BQSN} | Rest]) ->
+ case priority2(Content) >= P of
true -> {P, BQSN};
false -> priority1(Content, Rest)
end.
+priority2(#basic_message{content = Content}) ->
+ priority2(rabbit_binary_parser:ensure_content_decoded(Content));
+priority2(#content{properties = Props}) ->
+ #'P_basic'{priority = Priority0} = Props,
+ case Priority0 of
+ undefined -> 0;
+ _ when is_integer(Priority0) -> Priority0
+ end.
+
add_maybe_infinity(infinity, _) -> infinity;
add_maybe_infinity(_, infinity) -> infinity;
add_maybe_infinity(A, B) -> A + B.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a0e28e88f6..5e7672e7f0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,9 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
- publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
+ publish/6, publish_delivered/5,
+ batch_publish/4, batch_publish_delivered/4,
+ discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -558,52 +560,32 @@ purge(State = #vqstate { len = Len }) ->
purge_acks(State) -> a(purge_pending_ack(false, State)).
-publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, _Flow,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case ?QUEUE:is_empty(Q3) of
- false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
- end,
- InCount1 = InCount + 1,
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({1, 0}, {none, MsgStatus1},
- State2#vqstate{ next_seq_id = SeqId + 1,
- in_counter = InCount1,
- unconfirmed = UC1 }),
- a(reduce_memory_use(maybe_update_rates(State3))).
-
-publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
- MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
- _ChPid, _Flow,
- State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = record_pending_ack(m(MsgStatus1), State1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1},
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
- {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
+ State1 =
+ publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ a(reduce_memory_use(maybe_update_rates(State1))).
+
+batch_publish(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, State1} =
+ lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
+ State2 = ui(State1),
+ a(reduce_memory_use(maybe_update_rates(State2))).
+
+publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+
+batch_publish_delivered(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, SeqIds, State1} =
+ lists:foldl(fun batch_publish_delivered1/2,
+ {ChPid, Flow, [], State}, Publishes),
+ State2 = ui(State1),
+ {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
discard(_MsgId, _ChPid, _Flow, State) -> State.
@@ -1563,6 +1545,62 @@ process_delivers_and_acks_fun(_) ->
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
+ end,
+ InCount1 = InCount + 1,
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 }).
+
+batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
+ {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4, State)}.
+
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
+ {SeqId, State3}.
+
+batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4,
+ State),
+ {ChPid, Flow, [SeqId | SeqIds], State1}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_in_store = true }, State) ->