diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 62 |
1 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index d3bae0bb4c..4205dca590 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -923,7 +923,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, {offset, Offset}, {read, Rest}]}}) end, - {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res} + {Offset + TotalSize, Res} end, State), Message = #basic_message {} = bin_to_msg(MsgBody), ok = if RefCount > 1 -> @@ -1003,10 +1003,8 @@ remove_message(MsgId, Files, ets:lookup(FileSummary, File), ContiguousTop1 = lists:min([ContiguousTop, Offset]), true = - ets:insert(FileSummary, - {File, - (ValidTotalSize-TotalSize-?FILE_PACKING_ADJUSTMENT), - ContiguousTop1, Left, Right}), + ets:insert(FileSummary, {File, ValidTotalSize - TotalSize, + ContiguousTop1, Left, Right}), if CurName =:= File -> Files; true -> sets:add_element(File, Files) end; @@ -1036,8 +1034,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, is_persistent = IsPersistent }), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), - ValidTotalSize1 = ValidTotalSize + TotalSize + - ?FILE_PACKING_ADJUSTMENT, + ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> %% can't be any holes in this file ValidTotalSize1; @@ -1045,7 +1042,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, end, true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}), - NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + NextOffset = CurOffset + TotalSize, maybe_roll_to_new_file( NextOffset, State #dqstate {current_offset = NextOffset, current_dirty = true}); @@ -1437,19 +1434,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, %% update MsgLocationDets to reflect change of file and offset ok = dets_ets_insert(State, StoreEntry #message_store_entry { file = Destination, offset = CurOffset }), - NextOffset = CurOffset + Size, + NextOffset = CurOffset + TotalSize, if BlockStart =:= undefined -> %% base case, called only for the first list elem - {NextOffset, Offset, Offset + Size}; + {NextOffset, Offset, Offset + TotalSize}; Offset =:= BlockEnd -> %% extend the current block because the next %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; + {NextOffset, BlockStart, BlockEnd + TotalSize}; true -> %% found a gap, so actually do the work for %% the previous block @@ -1458,7 +1454,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, file:position(SourceHdl, BlockStart), {ok, BSize} = file:copy(SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + Size} + {NextOffset, Offset, Offset + TotalSize} end end, {InitOffset, undefined, undefined}, WorkList), %% do the last remaining block @@ -1729,7 +1725,7 @@ load_messages(Left, [], State) -> offset = MaxOffset, total_size = TotalSize} | _ ] = sort_msg_locations_by_offset(desc, L), - MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT + MaxOffset + TotalSize end, State #dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; @@ -1752,7 +1748,7 @@ load_messages(Left, [File|Files], total_size = TotalSize, is_persistent = IsPersistent }), {[Obj | VMAcc], - VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT + VTSAcc + TotalSize } end end, {[], 0}, Messages), @@ -1854,7 +1850,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% Remember the head of the list will be the highest entry %% in the file [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, + TmpSize = TmpTopOffset + TmpTopTotalSize, ExpectedAbsPos = Top + TmpSize, {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), %% and now extend the main file as big as necessary in a @@ -1890,7 +1886,7 @@ find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}; find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset} | Tail], ExpectedOffset, MsgIds) -> - ExpectedOffset1 = ExpectedOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + ExpectedOffset1 = ExpectedOffset + TotalSize, find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. @@ -1915,27 +1911,28 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> BodySize = size(MsgBody), MsgIdBin = term_to_binary(MsgId), MsgIdBinSize = size(MsgIdBin), - TotalSize = BodySize + MsgIdBinSize, + Size = BodySize + MsgIdBinSize, StopByte = case IsPersistent of true -> ?WRITE_OK_PERSISTENT; false -> ?WRITE_OK_TRANSIENT end, - case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, + case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, MsgIdBin:MsgIdBinSize/binary, MsgBody:BodySize/binary, StopByte:?WRITE_OK_SIZE_BITS>>) of - ok -> {ok, TotalSize}; + ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO end. read_message_from_disk(FileHdl, TotalSize) -> - TotalSizeWriteOkBytes = TotalSize + 1, - case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, + Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, + SizeWriteOkBytes = Size + 1, + case file:read(FileHdl, TotalSize) of + {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, - Rest:TotalSizeWriteOkBytes/binary>>} -> - BodySize = TotalSize - MsgIdBinSize, + Rest:SizeWriteOkBytes/binary>>} -> + BodySize = Size - MsgIdBinSize, <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, StopByte:?WRITE_OK_SIZE_BITS>> = Rest, Persistent = case StopByte of @@ -1976,14 +1973,13 @@ read_next_file_entry(FileHdl, Offset) -> TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, case file:read(FileHdl, TwoIntegers) of {ok, - <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> - case {TotalSize, MsgIdBinSize} of + <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + case {Size, MsgIdBinSize} of {0, _} -> eof; %% Nothing we can do other than stop {_, 0} -> %% current message corrupted, try skipping past it - ExpectedAbsPos = - Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, - case file:position(FileHdl, {cur, TotalSize + 1}) of + ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, + case file:position(FileHdl, {cur, Size + 1}) of {ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos}; {ok, _SomeOtherPos} -> eof; %% seek failed, so give up KO -> KO @@ -1991,10 +1987,10 @@ read_next_file_entry(FileHdl, Offset) -> {_, _} -> %% all good, let's continue case file:read(FileHdl, MsgIdBinSize) of {ok, <<MsgIdBin:MsgIdBinSize/binary>>} -> - ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + - TotalSize - 1, + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + ExpectedAbsPos = Offset + TotalSize - 1, case file:position( - FileHdl, {cur, TotalSize - MsgIdBinSize}) of + FileHdl, {cur, Size - MsgIdBinSize}) of {ok, ExpectedAbsPos} -> NextOffset = ExpectedAbsPos + 1, case read_stop_byte(FileHdl) of |
