diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 52 |
1 files changed, 25 insertions, 27 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index aee91f5db0..b4e6b8b106 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1663,7 +1663,7 @@ load_messages(Left, [], State) -> load_messages(Left, [File|Files], State = #dqstate { file_summary = FileSummary }) -> %% [{MsgId, TotalSize, FileOffset}] - {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), + {ok, Messages} = scan_file_for_valid_messages(File), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case length(mnesia:dirty_index_match_object @@ -1718,19 +1718,21 @@ verify_messages_in_mnesia(MsgIds) -> grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) -> MsgId. +scan_file_for_valid_messages_msg_ids(File) -> + {ok, Messages} = scan_file_for_valid_messages(File), + {ok, Messages, lists:map(fun grab_msg_id/1, Messages)}. + recover_crashed_compactions1(Files, TmpFile) -> NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFile, Files), %% [{MsgId, TotalSize, FileOffset}] - {ok, UncorruptedMessagesTmp} = - scan_file_for_valid_messages(form_filename(TmpFile)), - MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp), + {ok, UncorruptedMessagesTmp, MsgIdsTmp} = + scan_file_for_valid_messages_msg_ids(TmpFile), %% all of these messages should appear in the mnesia table, %% otherwise they wouldn't have been copied out verify_messages_in_mnesia(MsgIdsTmp), - {ok, UncorruptedMessages} = - scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), - MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages), + {ok, UncorruptedMessages, MsgIds} = + scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ %% tmpfile). This means that compaction failed immediately @@ -1798,9 +1800,8 @@ recover_crashed_compactions1(Files, TmpFile) -> ok = file:close(TmpHdl), ok = file:delete(TmpFile), - {ok, MainMessages} = - scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), - MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages), + {ok, _MainMessages, MsgIdsMain} = + scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile), %% check that everything in MsgIds is in MsgIdsMain true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end, MsgIds), @@ -1877,7 +1878,8 @@ read_message_from_disk(FileHdl, TotalSize) -> end. scan_file_for_valid_messages(File) -> - case file:open(File, [raw, binary, read]) of + FilePath = form_filename(File), + case file:open(FilePath, [raw, binary, read]) of {ok, Hdl} -> Valid = scan_file_for_valid_messages(Hdl, 0, []), %% if something really bad's happened, the close could fail, but ignore @@ -1889,10 +1891,10 @@ scan_file_for_valid_messages(File) -> scan_file_for_valid_messages(FileHdl, Offset, Acc) -> case read_next_file_entry(FileHdl, Offset) of - {ok, eof} -> {ok, Acc}; - {ok, {corrupted, NextOffset}} -> + eof -> {ok, Acc}; + {corrupted, NextOffset} -> scan_file_for_valid_messages(FileHdl, NextOffset, Acc); - {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} -> + {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} -> scan_file_for_valid_messages( FileHdl, NextOffset, [{MsgId, IsPersistent, TotalSize, Offset} | Acc]); @@ -1907,16 +1909,16 @@ read_next_file_entry(FileHdl, Offset) -> {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> case {TotalSize =:= 0, MsgIdBinSize =:= 0} of - {true, _} -> {ok, eof}; %% Nothing we can do other than stop + {true, _} -> eof; %% Nothing we can do other than stop {false, true} -> %% current message corrupted, try skipping past it ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, case file:position(FileHdl, {cur, TotalSize + 1}) of {ok, ExpectedAbsPos} -> - {ok, {corrupted, ExpectedAbsPos}}; + {corrupted, ExpectedAbsPos}; {ok, _SomeOtherPos} -> - {ok, eof}; %% seek failed, so give up + eof; %% seek failed, so give up KO -> KO end; {false, false} -> %% all good, let's continue @@ -1933,27 +1935,23 @@ read_next_file_entry(FileHdl, Offset) -> case file:read(FileHdl, 1) of {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> - {ok, - {ok, binary_to_term(MsgId), - false, TotalSize, NextOffset}}; + {ok, {binary_to_term(MsgId), + false, TotalSize, NextOffset}}; {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> - {ok, - {ok, binary_to_term(MsgId), - true, TotalSize, NextOffset}}; + {ok, {binary_to_term(MsgId), + true, TotalSize, NextOffset}}; {ok, _SomeOtherData} -> - {ok, {corrupted, NextOffset}}; + {corrupted, NextOffset}; KO -> KO end; {ok, _SomeOtherPos} -> %% seek failed, so give up - {ok, eof}; + eof; KO -> KO end; - eof -> {ok, eof}; KO -> KO end end; - eof -> {ok, eof}; KO -> KO end. |
