summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-03-28 13:43:21 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-03-28 18:04:15 +0000
commit99c5828455973c6a719a982aa453e4f946bc3f64 (patch)
tree583781f36e804c688fcb3dd73f5341a084ed065b /src
parent029594e4f3341b2949dae90b84e48e8c9fdd963c (diff)
downloadrabbitmq-server-git-99c5828455973c6a719a982aa453e4f946bc3f64.tar.gz
Batch log effect
[#164735591]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl46
1 files changed, 30 insertions, 16 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 960d8d144e..0f957b4a19 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1192,7 +1192,7 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
%% reverses the effects list
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
- Effects0, #{}),
+ Effects0, {#{}, #{}}),
case evaluate_limit(State0#?MODULE.ra_indexes, false,
State1, Effects1) of
{State, true, Effects} ->
@@ -1201,21 +1201,26 @@ checkout(#{index := Index}, State0, Effects0) ->
{State, ok, Effects}
end.
-checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, Acc) ->
- checkout0(checkout_one(State), [send_log_effect(ConsumerId, RaftIdx, MsgId, Header) | Effects], Acc);
-checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
+checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects,
+ {SendAcc, LogAcc0}) ->
+ DelMsg = {RaftIdx, {MsgId, Header}},
+ LogAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], LogAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) ->
DelMsg = {MsgId, Msg},
- Acc = maps:update_with(ConsumerId,
- fun (M) -> [DelMsg | M] end,
- [DelMsg], Acc0),
- checkout0(checkout_one(State), Effects, Acc);
-checkout0({Activity, State0}, Effects0, Acc) ->
+ SendAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], SendAcc0),
+ checkout0(checkout_one(State), Effects, {SendAcc, LogAcc});
+checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
Effects1 = case Activity of
nochange ->
- append_send_msg_effects(Effects0, Acc);
+ append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc);
inactive ->
[{aux, inactive}
- | append_send_msg_effects(Effects0, Acc)]
+ | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)]
end,
{State0, ok, lists:reverse(Effects1)}.
@@ -1251,6 +1256,11 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
+append_log_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, Msgs, Ef) ->
+ [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap).
+
%% next message is determined as follows:
%% First we check if there are are prefex returns
%% Then we check if there are current returns
@@ -1300,14 +1310,18 @@ take_next_msg(#?MODULE{returns = Returns,
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
-send_log_effect({CTag, CPid}, RaftIdx, MsgId, Header) ->
- {log, RaftIdx, fun({enqueue, _, _, Msg}) ->
- {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, Msg}}]}, ra_event}
- end}.
+send_log_effect({CTag, CPid}, IdxMsgs) ->
+ {RaftIdxs, Data} = lists:unzip(IdxMsgs),
+ {log, RaftIdxs, fun(Log) ->
+ Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}
+ end}.
reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{log, RaftIdx, fun({enqueue, _, _, Msg}) ->
- {wrap_reply, From, {dequeue, {MsgId, {Header, Msg}}, Ready}}
+ {reply, From, {wrap_reply, {dequeue, {MsgId, {Header, Msg}}, Ready}}}
end}.
checkout_one(#?MODULE{service_queue = SQ0,