diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-20 18:10:41 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-20 18:10:41 +0000 |
| commit | 2a6a784fd6088f01b6752fb94499aa539388836d (patch) | |
| tree | edb016dbde2a8a7844b3b8d4a56a399216fe4b8d | |
| parent | 961df06526ea039207871359b2d5fc859030d134 (diff) | |
| download | rabbitmq-server-git-2a6a784fd6088f01b6752fb94499aa539388836d.tar.gz | |
Read segment files in one call to fhc:read/2.
| -rw-r--r-- | src/rabbit_queue_index.erl | 62 |
1 files changed, 27 insertions, 35 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index df8c3acb9a..0dfbc1ffd7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -953,46 +953,38 @@ load_segment(KeepAcked, #segment { path = Path }) -> Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of false -> Empty; - true -> {ok, Hdl} = file_handle_cache:open( - Path, ?READ_MODE, - [{read_buffer, ?READ_BUFFER_SIZE}]), + true -> Size = rabbit_file:file_size(Path), + file_handle_cache_stats:update(queue_index_read), + {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = load_segment_entries(Hdl, KeepAcked, Empty), + {ok, SegBin} = file_handle_cache:read(Hdl, Size), ok = file_handle_cache:close(Hdl), + Res = parse_segment_entries(SegBin, KeepAcked, Empty), Res end. -load_segment_entries(Hdl, KeepAcked, Acc) -> - file_handle_cache_stats:update(queue_index_read), - case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of - {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS, - IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} -> - load_segment_entries( - Hdl, KeepAcked, - load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc)); - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> - load_segment_entries( - Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); - eof -> %% TODO or maybe _ - Acc - end. - -load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary, - MsgSize:?EMBEDDED_SIZE_BITS>>} -> - case read_msg(MsgSize, Hdl) of - {ok, MsgBin} -> - Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - {SegEntries1, Unacked + 1}; - _ -> - {SegEntries, Unacked} - end; - _ -> - {SegEntries, Unacked} - end. +parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>, + KeepAcked, Acc) -> + parse_segment_publish_entry( + Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc); +parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) -> + parse_segment_entries( + Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); +parse_segment_entries(<<>>, _KeepAcked, Acc) -> + Acc. + +parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, + MsgBin:MsgSize/binary, Rest/binary>>, + IsPersistent, RelSeq, KeepAcked, + {SegEntries, Unacked}) -> + Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1}); +parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) -> + parse_segment_entries(Rest, KeepAcked, Acc). add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> case array:get(RelSeq, SegEntries) of |
