diff options
author | Loïc Hoguin <lhoguin@vmware.com> | 2022-10-03 13:00:31 +0200 |
---|---|---|
committer | Loïc Hoguin <lhoguin@vmware.com> | 2022-10-03 13:00:31 +0200 |
commit | 6796f2a14d0110b67ae28a09a22210e8057f80c6 (patch) | |
tree | 3bdf5bf32bf61d005622081d999ea1db07b9e2fe | |
parent | e4f04e89ea66f5ec78d45864e1c02fa24027eba1 (diff) | |
download | rabbitmq-server-git-loic-cq-experiments.tar.gz |
DO NOT MERGE experiment spawning to write in v2 storeloic-cq-experiments
-rw-r--r-- | deps/rabbit/src/rabbit_classic_queue_store_v2.erl | 57 |
1 files changed, 29 insertions, 28 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl index f76521d74b..639bf62dd6 100644 --- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl @@ -194,34 +194,35 @@ maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) -> flush_buffer(State = #qs{ write_buffer_size = 0 }, _) -> State; -flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) -> - CheckCRC32 = check_crc32(), - SegmentEntryCount = segment_entry_count(), - %% First we prepare the writes sorted by segment. - WriteList = lists:sort(maps:to_list(WriteBuffer)), - Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount), - %% Then we do the writes for each segment. - State = lists:foldl(fun({Segment, LocBytes}, FoldState) -> - {ok, Fd} = file:open(segment_file(Segment, FoldState), [read, write, raw, binary]), - case file:position(Fd, eof) of - {ok, 0} -> - %% We write the file header if it does not exist. - FromSeqId = Segment * SegmentEntryCount, - ToSeqId = FromSeqId + SegmentEntryCount, - ok = file:write(Fd, - << ?MAGIC:32, - ?VERSION:8, - FromSeqId:64/unsigned, - ToSeqId:64/unsigned, - 0:344 >>); - _ -> - ok - end, - ok = file:pwrite(Fd, lists:sort(LocBytes)), - FsyncFun(Fd), - ok = file:close(Fd), - FoldState - end, State0, Writes), +flush_buffer(State = #qs{ write_buffer = WriteBuffer }, FsyncFun) -> + spawn(fun() -> + CheckCRC32 = check_crc32(), + SegmentEntryCount = segment_entry_count(), + %% First we prepare the writes sorted by segment. + WriteList = lists:sort(maps:to_list(WriteBuffer)), + Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount), + %% Then we do the writes for each segment. + lists:foreach(fun({Segment, LocBytes}) -> + {ok, Fd} = file:open(segment_file(Segment, State), [read, write, raw, binary]), + case file:position(Fd, eof) of + {ok, 0} -> + %% We write the file header if it does not exist. + FromSeqId = Segment * SegmentEntryCount, + ToSeqId = FromSeqId + SegmentEntryCount, + ok = file:write(Fd, + << ?MAGIC:32, + ?VERSION:8, + FromSeqId:64/unsigned, + ToSeqId:64/unsigned, + 0:344 >>); + _ -> + ok + end, + ok = file:pwrite(Fd, lists:sort(LocBytes)), + FsyncFun(Fd), + ok = file:close(Fd) + end, Writes) + end), %% Finally we move the write_buffer to the cache. State#qs{ write_buffer = #{}, write_buffer_size = 0, |