diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-03-28 13:43:21 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-03-28 18:04:15 +0000 |
| commit | 99c5828455973c6a719a982aa453e4f946bc3f64 (patch) | |
| tree | 583781f36e804c688fcb3dd73f5341a084ed065b | |
| parent | 029594e4f3341b2949dae90b84e48e8c9fdd963c (diff) | |
| download | rabbitmq-server-git-99c5828455973c6a719a982aa453e4f946bc3f64.tar.gz | |
Batch log effect
[#164735591]
| -rw-r--r-- | src/rabbit_fifo.erl | 46 |
1 files changed, 30 insertions, 16 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 960d8d144e..0f957b4a19 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1192,7 +1192,7 @@ return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> %% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), - Effects0, #{}), + Effects0, {#{}, #{}}), case evaluate_limit(State0#?MODULE.ra_indexes, false, State1, Effects1) of {State, true, Effects} -> @@ -1201,21 +1201,26 @@ checkout(#{index := Index}, State0, Effects0) -> {State, ok, Effects} end. -checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, Acc) -> - checkout0(checkout_one(State), [send_log_effect(ConsumerId, RaftIdx, MsgId, Header) | Effects], Acc); -checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> +checkout0({success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, Effects, + {SendAcc, LogAcc0}) -> + DelMsg = {RaftIdx, {MsgId, Header}}, + LogAcc = maps:update_with(ConsumerId, + fun (M) -> [DelMsg | M] end, + [DelMsg], LogAcc0), + checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); +checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, {SendAcc0, LogAcc}) -> DelMsg = {MsgId, Msg}, - Acc = maps:update_with(ConsumerId, - fun (M) -> [DelMsg | M] end, - [DelMsg], Acc0), - checkout0(checkout_one(State), Effects, Acc); -checkout0({Activity, State0}, Effects0, Acc) -> + SendAcc = maps:update_with(ConsumerId, + fun (M) -> [DelMsg | M] end, + [DelMsg], SendAcc0), + checkout0(checkout_one(State), Effects, {SendAcc, LogAcc}); +checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> Effects1 = case Activity of nochange -> - append_send_msg_effects(Effects0, Acc); + append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc); inactive -> [{aux, inactive} - | append_send_msg_effects(Effects0, Acc)] + | append_send_msg_effects(append_log_effects(Effects0, LogAcc), SendAcc)] end, {State0, ok, lists:reverse(Effects1)}. @@ -1251,6 +1256,11 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. +append_log_effects(Effects0, AccMap) -> + maps:fold(fun (C, Msgs, Ef) -> + [send_log_effect(C, lists:reverse(Msgs)) | Ef] + end, Effects0, AccMap). + %% next message is determined as follows: %% First we check if there are are prefex returns %% Then we check if there are current returns @@ -1300,14 +1310,18 @@ take_next_msg(#?MODULE{returns = Returns, send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. -send_log_effect({CTag, CPid}, RaftIdx, MsgId, Header) -> - {log, RaftIdx, fun({enqueue, _, _, Msg}) -> - {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, Msg}}]}, ra_event} - end}. +send_log_effect({CTag, CPid}, IdxMsgs) -> + {RaftIdxs, Data} = lists:unzip(IdxMsgs), + {log, RaftIdxs, fun(Log) -> + Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}} + end, Log, Data), + {send_msg, CPid, {delivery, CTag, Msgs}, ra_event} + end}. reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {log, RaftIdx, fun({enqueue, _, _, Msg}) -> - {wrap_reply, From, {dequeue, {MsgId, {Header, Msg}}, Ready}} + {reply, From, {wrap_reply, {dequeue, {MsgId, {Header, Msg}}, Ready}}} end}. checkout_one(#?MODULE{service_queue = SQ0, |
