diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-21 12:19:10 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-21 12:19:10 +0000 |
| commit | 043dd23be06db4015aaf32ffb282cf162d92c8ca (patch) | |
| tree | bb5ac7e77a8a3858b538503bf4464f7fc5402564 | |
| parent | 79b28c03bea34d1536344201cf2e839971eb7fe1 (diff) | |
| parent | 0068d17d865761cadedf4de278664f78cccccff7 (diff) | |
| download | rabbitmq-server-git-043dd23be06db4015aaf32ffb282cf162d92c8ca.tar.gz | |
Merge bug26543
| -rw-r--r-- | src/rabbit_queue_index.erl | 196 |
1 files changed, 87 insertions, 109 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 77d24f6f7f..676cb6e043 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -619,12 +619,6 @@ create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry, expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>; expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>. -read_msg(0, _Hdl) -> {ok, <<>>}; -read_msg(Size, Hdl) -> case file_handle_cache:read(Hdl, Size) of - {ok, MsgBin} -> {ok, MsgBin}; - Else -> Else - end. - parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS>>, MsgBin) -> %% work around for binary data fragmentation. See @@ -710,9 +704,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + _ -> Seg = array:sparse_foldr( + fun entry_to_segment/3, [], JEntries), + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), - array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), + file_handle_cache:append(Hdl, Seg), ok = file_handle_cache:close(Hdl), Segment #segment { journal_entries = array_new() } end. @@ -731,10 +729,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State = #qistate { dir = Dir }) -> - case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of + Path = filename:join(Dir, ?JOURNAL_FILENAME), + case rabbit_file:is_file(Path) of true -> {JournalHdl, State1} = get_journal_handle(State), + Size = rabbit_file:file_size(Path), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1); + {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size), + parse_journal_entries(JournalBin, State1); false -> State end. @@ -758,42 +759,30 @@ recover_journal(State) -> end, Segments), State1 #qistate { segments = Segments1 }. -load_journal_entries(State = #qistate { journal_handle = Hdl }) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES) of - {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> - case Prefix of - ?DEL_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, del, State)); - ?ACK_JPREFIX -> - load_journal_entries(add_to_journal(SeqId, ack, State)); - _ -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of - %% Journal entry composed only of zeroes was probably - %% produced during a dirty shutdown so stop reading - {ok, <<0:?PUB_RECORD_SIZE_BYTES/unit:8>>} -> - State; - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary, - MsgSize:?EMBEDDED_SIZE_BITS>>} -> - case read_msg(MsgSize, Hdl) of - {ok, MsgBin} -> - IsPersistent = - case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end, - load_journal_entries( - add_to_journal( - SeqId, {IsPersistent, Bin, MsgBin}, - State)); - _ErrOrEoF -> - State - end; - _ErrOrEoF -> %% err, we've lost at least a publish - State - end - end; - _ErrOrEoF -> State - end. +parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, del, State)); + +parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Rest/binary>>, State) -> + parse_journal_entries(Rest, add_to_journal(SeqId, ack, State)); +parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS, + 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) -> + %% Journal entry composed only of zeroes was probably + %% produced during a dirty shutdown so stop reading + State; +parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, + Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary, + Rest/binary>>, State) -> + IsPersistent = case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end, + parse_journal_entries( + Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State)); +parse_journal_entries(_ErrOrEoF, State) -> + State. deliver_or_ack(_Kind, [], State) -> State; @@ -893,34 +882,31 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) -> - Hdl; -write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> - ok = case Pub of - no_pub -> - ok; - {IsPersistent, Bin, MsgBin} -> - file_handle_cache_stats:update(queue_index_write), - file_handle_cache:append( - Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, Bin/binary, - (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]) - end, - ok = case {Del, Ack} of - {no_del, no_ack} -> - ok; - _ -> - Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - file_handle_cache_stats:update(queue_index_write), - file_handle_cache:append( - Hdl, case {Del, Ack} of - {del, ack} -> [Binary, Binary]; - _ -> Binary - end) - end, - Hdl. +entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> + Buf; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> + %% NB: we are assembling the segment in reverse order here, so + %% del/ack comes first. + Buf1 = case {Del, Ack} of + {no_del, no_ack} -> + Buf; + _ -> + Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + case {Del, Ack} of + {del, ack} -> [[Binary, Binary] | Buf]; + _ -> [Binary | Buf] + end + end, + case Pub of + no_pub -> + Buf1; + {IsPersistent, Bin, MsgBin} -> + [[<<?PUB_PREFIX:?PUB_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, Bin/binary, + (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1] + end. read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> @@ -954,46 +940,38 @@ load_segment(KeepAcked, #segment { path = Path }) -> Empty = {array_new(), 0}, case rabbit_file:is_file(Path) of false -> Empty; - true -> {ok, Hdl} = file_handle_cache:open( - Path, ?READ_MODE, - [{read_buffer, ?READ_BUFFER_SIZE}]), + true -> Size = rabbit_file:file_size(Path), + file_handle_cache_stats:update(queue_index_read), + {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - Res = load_segment_entries(Hdl, KeepAcked, Empty), + {ok, SegBin} = file_handle_cache:read(Hdl, Size), ok = file_handle_cache:close(Hdl), + Res = parse_segment_entries(SegBin, KeepAcked, Empty), Res end. -load_segment_entries(Hdl, KeepAcked, Acc) -> - file_handle_cache_stats:update(queue_index_read), - case file_handle_cache:read(Hdl, ?PUB_RECORD_PREFIX_BYTES) of - {ok, <<?PUB_PREFIX:?PUB_PREFIX_BITS, - IsPersistNum:1, RelSeq:?REL_SEQ_BITS>>} -> - load_segment_entries( - Hdl, KeepAcked, - load_segment_publish_entry(Hdl, 1 == IsPersistNum, RelSeq, Acc)); - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> - load_segment_entries( - Hdl, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); - eof -> %% TODO or maybe _ - Acc - end. - -load_segment_publish_entry(Hdl, IsPersistent, RelSeq, {SegEntries, Unacked}) -> - case file_handle_cache:read(Hdl, ?PUB_RECORD_SIZE_BYTES) of - {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary, - MsgSize:?EMBEDDED_SIZE_BITS>>} -> - case read_msg(MsgSize, Hdl) of - {ok, MsgBin} -> - Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, - SegEntries1 = array:set(RelSeq, Obj, SegEntries), - {SegEntries1, Unacked + 1}; - _ -> - {SegEntries, Unacked} - end; - _ -> - {SegEntries, Unacked} - end. +parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS, + IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>, + KeepAcked, Acc) -> + parse_segment_publish_entry( + Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc); +parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) -> + parse_segment_entries( + Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc)); +parse_segment_entries(<<>>, _KeepAcked, Acc) -> + Acc. + +parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary, + MsgSize:?EMBEDDED_SIZE_BITS, + MsgBin:MsgSize/binary, Rest/binary>>, + IsPersistent, RelSeq, KeepAcked, + {SegEntries, Unacked}) -> + Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack}, + SegEntries1 = array:set(RelSeq, Obj, SegEntries), + parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1}); +parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) -> + parse_segment_entries(Rest, KeepAcked, Acc). add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> case array:get(RelSeq, SegEntries) of |
