diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-20 17:29:16 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-20 17:29:16 +0000 |
| commit | 961df06526ea039207871359b2d5fc859030d134 (patch) | |
| tree | eab797cb80277dda27954186799421adf659867c | |
| parent | 79b28c03bea34d1536344201cf2e839971eb7fe1 (diff) | |
| download | rabbitmq-server-git-961df06526ea039207871359b2d5fc859030d134.tar.gz | |
Write segment files in one append/2 operation.
| -rw-r--r-- | src/rabbit_queue_index.erl | 59 |
1 files changed, 29 insertions, 30 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 77d24f6f7f..df8c3acb9a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -710,9 +710,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + _ -> Seg = array:sparse_foldr( + fun entry_to_segment/3, [], JEntries), + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), - array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + file_handle_cache:append(Hdl, Seg), ok = file_handle_cache:close(Hdl), Segment #segment { journal_entries = array_new() } end. @@ -893,34 +897,29 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) -> - Hdl; -write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> - ok = case Pub of - no_pub -> - ok; - {IsPersistent, Bin, MsgBin} -> - file_handle_cache_stats:update(queue_index_write), - file_handle_cache:append( - Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, Bin/binary, - (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]) - end, - ok = case {Del, Ack} of - {no_del, no_ack} -> - ok; - _ -> - Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - file_handle_cache_stats:update(queue_index_write), - file_handle_cache:append( - Hdl, case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end) - end, - Hdl. +entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> + Buf; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> + Buf1 = case Pub of + no_pub -> + Buf; + {IsPersistent, Bin, MsgBin} -> + [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf] + end, + case {Del, Ack} of + {no_del, no_ack} -> + Buf1; + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + case {Del, Ack} of + {del, ack} -> [[Binary, Binary] | Buf]; + _ -> [Binary | Buf] + end + end. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> |
