diff options
| -rw-r--r-- | src/rabbit.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 21 |
2 files changed, 14 insertions, 9 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index dafdaed5c9..867a72f0b6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -540,6 +540,8 @@ start_loaded_apps(Apps, RestartTypes) -> %% make Ra use a custom logger that dispatches to lager instead of the %% default OTP logger application:set_env(ra, logger_module, rabbit_log_ra_shim), + %% use a larger segments size for queues + application:set_env(ra, segment_max_entries, 32768), ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of undefined -> []; diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index f8f4e78943..6f168bf78e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1371,17 +1371,20 @@ send_msg_effect({CTag, CPid}, Msgs) -> send_log_effect({CTag, CPid}, IdxMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), - {log, RaftIdxs, fun(Log) -> - Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> - {MsgId, {Header, Msg}} - end, Log, Data), - {send_msg, CPid, {delivery, CTag, Msgs}, ra_event} - end}. + {log, RaftIdxs, + fun(Log) -> + Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}} + end, Log, Data), + [{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}] + end}. reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> - {log, RaftIdx, fun({enqueue, _, _, Msg}) -> - {reply, From, {wrap_reply, {dequeue, {MsgId, {Header, Msg}}, Ready}}} - end}. + {log, RaftIdx, + fun({enqueue, _, _, Msg}) -> + [{reply, From, {wrap_reply, + {dequeue, {MsgId, {Header, Msg}}, Ready}}}] + end}. checkout_one(#?MODULE{service_queue = SQ0, messages = Messages0, |
