summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl52
1 files changed, 25 insertions, 27 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index aee91f5db0..b4e6b8b106 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1663,7 +1663,7 @@ load_messages(Left, [], State) ->
load_messages(Left, [File|Files],
State = #dqstate { file_summary = FileSummary }) ->
%% [{MsgId, TotalSize, FileOffset}]
- {ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
+ {ok, Messages} = scan_file_for_valid_messages(File),
{ValidMessages, ValidTotalSize} = lists:foldl(
fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case length(mnesia:dirty_index_match_object
@@ -1718,19 +1718,21 @@ verify_messages_in_mnesia(MsgIds) ->
grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) ->
MsgId.
+scan_file_for_valid_messages_msg_ids(File) ->
+ {ok, Messages} = scan_file_for_valid_messages(File),
+ {ok, Messages, lists:map(fun grab_msg_id/1, Messages)}.
+
recover_crashed_compactions1(Files, TmpFile) ->
NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFile, Files),
%% [{MsgId, TotalSize, FileOffset}]
- {ok, UncorruptedMessagesTmp} =
- scan_file_for_valid_messages(form_filename(TmpFile)),
- MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp),
+ {ok, UncorruptedMessagesTmp, MsgIdsTmp} =
+ scan_file_for_valid_messages_msg_ids(TmpFile),
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
verify_messages_in_mnesia(MsgIdsTmp),
- {ok, UncorruptedMessages} =
- scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages),
+ {ok, UncorruptedMessages, MsgIds} =
+ scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1798,9 +1800,8 @@ recover_crashed_compactions1(Files, TmpFile) ->
ok = file:close(TmpHdl),
ok = file:delete(TmpFile),
- {ok, MainMessages} =
- scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages),
+ {ok, _MainMessages, MsgIdsMain} =
+ scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile),
%% check that everything in MsgIds is in MsgIdsMain
true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
MsgIds),
@@ -1877,7 +1878,8 @@ read_message_from_disk(FileHdl, TotalSize) ->
end.
scan_file_for_valid_messages(File) ->
- case file:open(File, [raw, binary, read]) of
+ FilePath = form_filename(File),
+ case file:open(FilePath, [raw, binary, read]) of
{ok, Hdl} ->
Valid = scan_file_for_valid_messages(Hdl, 0, []),
%% if something really bad's happened, the close could fail, but ignore
@@ -1889,10 +1891,10 @@ scan_file_for_valid_messages(File) ->
scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
case read_next_file_entry(FileHdl, Offset) of
- {ok, eof} -> {ok, Acc};
- {ok, {corrupted, NextOffset}} ->
+ eof -> {ok, Acc};
+ {corrupted, NextOffset} ->
scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
- {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} ->
scan_file_for_valid_messages(
FileHdl, NextOffset,
[{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
@@ -1907,16 +1909,16 @@ read_next_file_entry(FileHdl, Offset) ->
{ok,
<<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
- {true, _} -> {ok, eof}; %% Nothing we can do other than stop
+ {true, _} -> eof; %% Nothing we can do other than stop
{false, true} ->
%% current message corrupted, try skipping past it
ExpectedAbsPos =
Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
case file:position(FileHdl, {cur, TotalSize + 1}) of
{ok, ExpectedAbsPos} ->
- {ok, {corrupted, ExpectedAbsPos}};
+ {corrupted, ExpectedAbsPos};
{ok, _SomeOtherPos} ->
- {ok, eof}; %% seek failed, so give up
+ eof; %% seek failed, so give up
KO -> KO
end;
{false, false} -> %% all good, let's continue
@@ -1933,27 +1935,23 @@ read_next_file_entry(FileHdl, Offset) ->
case file:read(FileHdl, 1) of
{ok,
<<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} ->
- {ok,
- {ok, binary_to_term(MsgId),
- false, TotalSize, NextOffset}};
+ {ok, {binary_to_term(MsgId),
+ false, TotalSize, NextOffset}};
{ok,
<<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} ->
- {ok,
- {ok, binary_to_term(MsgId),
- true, TotalSize, NextOffset}};
+ {ok, {binary_to_term(MsgId),
+ true, TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
- {ok, {corrupted, NextOffset}};
+ {corrupted, NextOffset};
KO -> KO
end;
{ok, _SomeOtherPos} ->
%% seek failed, so give up
- {ok, eof};
+ eof;
KO -> KO
end;
- eof -> {ok, eof};
KO -> KO
end
end;
- eof -> {ok, eof};
KO -> KO
end.