summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-20 18:10:41 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-20 18:10:41 +0000
commit2a6a784fd6088f01b6752fb94499aa539388836d (patch)
treeedb016dbde2a8a7844b3b8d4a56a399216fe4b8d
parent961df06526ea039207871359b2d5fc859030d134 (diff)
downloadrabbitmq-server-git-2a6a784fd6088f01b6752fb94499aa539388836d.tar.gz
Read segment files in one call to fhc:read/2.
-rw-r--r--src/rabbit_queue_index.erl62
1 files changed, 27 insertions, 35 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index df8c3acb9a..0dfbc1ffd7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -953,46 +953,38 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(
- Path, ?READ_MODE,
- [{read_buffer, ?READ_BUFFER_SIZE}]),
+ true -> Size = rabbit_file:file_size(Path),
+ file_handle_cache_stats:update(queue_index_read),
+ {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = load_segment_entries(Hdl, KeepAcked, Empty),
+ {ok, SegBin} = file_handle_cache:read(Hdl, Size),
ok = file_handle_cache:close(Hdl),
+ Res = parse_segment_entries(SegBin, KeepAcked, Empty),
Res
end.
-load_segment_entries(Hdl, KeepAcked, Acc) ->
- file_handle_cache_stats:update(queue_index_read),
- case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of
- {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} ->
- load_segment_entries(
- Hdl, KeepAcked,
- load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc));
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
- load_segment_entries(
- Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
- eof -> %% TODO or maybe _
- Acc
- end.
-
-load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary,
- MsgSize:?EMBEDDED_SIZE_BITS>>} ->
- case read_msg(MsgSize, Hdl) of
- {ok, MsgBin} ->
- Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
- SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- {SegEntries1, Unacked + 1};
- _ ->
- {SegEntries, Unacked}
- end;
- _ ->
- {SegEntries, Unacked}
- end.
+parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>,
+ KeepAcked, Acc) ->
+ parse_segment_publish_entry(
+ Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc);
+parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) ->
+ parse_segment_entries(
+ Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+parse_segment_entries(<<>>, _KeepAcked, Acc) ->
+ Acc.
+
+parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS,
+ MsgBin:MsgSize/binary, Rest/binary>>,
+ IsPersistent, RelSeq, KeepAcked,
+ {SegEntries, Unacked}) ->
+ Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1});
+parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) ->
+ parse_segment_entries(Rest, KeepAcked, Acc).
add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
case array:get(RelSeq, SegEntries) of