summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-20 17:29:16 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-20 17:29:16 +0000
commit961df06526ea039207871359b2d5fc859030d134 (patch)
treeeab797cb80277dda27954186799421adf659867c
parent79b28c03bea34d1536344201cf2e839971eb7fe1 (diff)
downloadrabbitmq-server-git-961df06526ea039207871359b2d5fc859030d134.tar.gz
Write segment files in one append/2 operation.
-rw-r--r--src/rabbit_queue_index.erl59
1 files changed, 29 insertions, 30 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 77d24f6f7f..df8c3acb9a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -710,9 +710,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
+ _ -> Seg = array:sparse_foldr(
+ fun entry_to_segment/3, [], JEntries),
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
- array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
+ file_handle_cache:append(Hdl, Seg),
ok = file_handle_cache:close(Hdl),
Segment #segment { journal_entries = array_new() }
end.
@@ -893,34 +897,29 @@ segment_nums({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) ->
- Hdl;
-write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
- ok = case Pub of
- no_pub ->
- ok;
- {IsPersistent, Bin, MsgBin} ->
- file_handle_cache_stats:update(queue_index_write),
- file_handle_cache:append(
- Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, Bin/binary,
- (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin])
- end,
- ok = case {Del, Ack} of
- {no_del, no_ack} ->
- ok;
- _ ->
- Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- file_handle_cache_stats:update(queue_index_write),
- file_handle_cache:append(
- Hdl, case {Del, Ack} of
- {del, ack} -> [Binary, Binary];
- _ -> Binary
- end)
- end,
- Hdl.
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
+ Buf;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
+ Buf1 = case Pub of
+ no_pub ->
+ Buf;
+ {IsPersistent, Bin, MsgBin} ->
+ [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf]
+ end,
+ case {Del, Ack} of
+ {no_del, no_ack} ->
+ Buf1;
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ case {Del, Ack} of
+ {del, ack} -> [[Binary, Binary] | Buf];
+ _ -> [Binary | Buf]
+ end
+ end.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->