summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2022-10-03 13:00:31 +0200
committerLoïc Hoguin <lhoguin@vmware.com>2022-10-03 13:00:31 +0200
commit6796f2a14d0110b67ae28a09a22210e8057f80c6 (patch)
tree3bdf5bf32bf61d005622081d999ea1db07b9e2fe
parente4f04e89ea66f5ec78d45864e1c02fa24027eba1 (diff)
downloadrabbitmq-server-git-loic-cq-experiments.tar.gz
DO NOT MERGE experiment spawning to write in v2 storeloic-cq-experiments
-rw-r--r--deps/rabbit/src/rabbit_classic_queue_store_v2.erl57
1 files changed, 29 insertions, 28 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl
index f76521d74b..639bf62dd6 100644
--- a/deps/rabbit/src/rabbit_classic_queue_store_v2.erl
+++ b/deps/rabbit/src/rabbit_classic_queue_store_v2.erl
@@ -194,34 +194,35 @@ maybe_flush_buffer(State = #qs{ write_buffer_size = WriteBufferSize }) ->
flush_buffer(State = #qs{ write_buffer_size = 0 }, _) ->
State;
-flush_buffer(State0 = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
- CheckCRC32 = check_crc32(),
- SegmentEntryCount = segment_entry_count(),
- %% First we prepare the writes sorted by segment.
- WriteList = lists:sort(maps:to_list(WriteBuffer)),
- Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount),
- %% Then we do the writes for each segment.
- State = lists:foldl(fun({Segment, LocBytes}, FoldState) ->
- {ok, Fd} = file:open(segment_file(Segment, FoldState), [read, write, raw, binary]),
- case file:position(Fd, eof) of
- {ok, 0} ->
- %% We write the file header if it does not exist.
- FromSeqId = Segment * SegmentEntryCount,
- ToSeqId = FromSeqId + SegmentEntryCount,
- ok = file:write(Fd,
- << ?MAGIC:32,
- ?VERSION:8,
- FromSeqId:64/unsigned,
- ToSeqId:64/unsigned,
- 0:344 >>);
- _ ->
- ok
- end,
- ok = file:pwrite(Fd, lists:sort(LocBytes)),
- FsyncFun(Fd),
- ok = file:close(Fd),
- FoldState
- end, State0, Writes),
+flush_buffer(State = #qs{ write_buffer = WriteBuffer }, FsyncFun) ->
+ spawn(fun() ->
+ CheckCRC32 = check_crc32(),
+ SegmentEntryCount = segment_entry_count(),
+ %% First we prepare the writes sorted by segment.
+ WriteList = lists:sort(maps:to_list(WriteBuffer)),
+ Writes = flush_buffer_build(WriteList, CheckCRC32, SegmentEntryCount),
+ %% Then we do the writes for each segment.
+ lists:foreach(fun({Segment, LocBytes}) ->
+ {ok, Fd} = file:open(segment_file(Segment, State), [read, write, raw, binary]),
+ case file:position(Fd, eof) of
+ {ok, 0} ->
+ %% We write the file header if it does not exist.
+ FromSeqId = Segment * SegmentEntryCount,
+ ToSeqId = FromSeqId + SegmentEntryCount,
+ ok = file:write(Fd,
+ << ?MAGIC:32,
+ ?VERSION:8,
+ FromSeqId:64/unsigned,
+ ToSeqId:64/unsigned,
+ 0:344 >>);
+ _ ->
+ ok
+ end,
+ ok = file:pwrite(Fd, lists:sort(LocBytes)),
+ FsyncFun(Fd),
+ ok = file:close(Fd)
+ end, Writes)
+ end),
%% Finally we move the write_buffer to the cache.
State#qs{ write_buffer = #{},
write_buffer_size = 0,