diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-08-29 10:32:42 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-08-29 10:32:42 +0100 |
| commit | e30eba75c782f5b6340fd7ebae835665cd336ea9 (patch) | |
| tree | 0f6555f6c862e20e581e705ad11432735e47b8fe | |
| parent | ef2d908ab3b26aea0e832aa7197c877d93ff52e7 (diff) | |
| download | rabbitmq-server-git-e30eba75c782f5b6340fd7ebae835665cd336ea9.tar.gz | |
banish ?FILE_PACKING_ADJUSTMENT from all but three functions
The details of the message packing are opaque to high level code, as
they should be. The TotalSize that code sees now is the total size of
the message on disk, including all packing adjustments, which is all
that is ever needed to perform all the necessary file positioning etc
at that level.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 62 |
1 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index d3bae0bb4c..4205dca590 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -923,7 +923,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, {offset, Offset}, {read, Rest}]}}) end, - {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res} + {Offset + TotalSize, Res} end, State), Message = #basic_message {} = bin_to_msg(MsgBody), ok = if RefCount > 1 -> @@ -1003,10 +1003,8 @@ remove_message(MsgId, Files, ets:lookup(FileSummary, File), ContiguousTop1 = lists:min([ContiguousTop, Offset]), true = - ets:insert(FileSummary, - {File, - (ValidTotalSize-TotalSize-?FILE_PACKING_ADJUSTMENT), - ContiguousTop1, Left, Right}), + ets:insert(FileSummary, {File, ValidTotalSize - TotalSize, + ContiguousTop1, Left, Right}), if CurName =:= File -> Files; true -> sets:add_element(File, Files) end; @@ -1036,8 +1034,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, is_persistent = IsPersistent }), [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] = ets:lookup(FileSummary, CurName), - ValidTotalSize1 = ValidTotalSize + TotalSize + - ?FILE_PACKING_ADJUSTMENT, + ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> %% can't be any holes in this file ValidTotalSize1; @@ -1045,7 +1042,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, end, true = ets:insert(FileSummary, {CurName, ValidTotalSize1, ContiguousTop1, Left, undefined}), - NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + NextOffset = CurOffset + TotalSize, maybe_roll_to_new_file( NextOffset, State #dqstate {current_offset = NextOffset, current_dirty = true}); @@ -1437,19 +1434,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile - Size = TotalSize + ?FILE_PACKING_ADJUSTMENT, %% update MsgLocationDets to reflect change of file and offset ok = dets_ets_insert(State, StoreEntry #message_store_entry { file = Destination, offset = CurOffset }), - NextOffset = CurOffset + Size, + NextOffset = CurOffset + TotalSize, if BlockStart =:= undefined -> %% base case, called only for the first list elem - {NextOffset, Offset, Offset + Size}; + {NextOffset, Offset, Offset + TotalSize}; Offset =:= BlockEnd -> %% extend the current block because the next %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + Size}; + {NextOffset, BlockStart, BlockEnd + TotalSize}; true -> %% found a gap, so actually do the work for %% the previous block @@ -1458,7 +1454,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, file:position(SourceHdl, BlockStart), {ok, BSize} = file:copy(SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + Size} + {NextOffset, Offset, Offset + TotalSize} end end, {InitOffset, undefined, undefined}, WorkList), %% do the last remaining block @@ -1729,7 +1725,7 @@ load_messages(Left, [], State) -> offset = MaxOffset, total_size = TotalSize} | _ ] = sort_msg_locations_by_offset(desc, L), - MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT + MaxOffset + TotalSize end, State #dqstate { current_file_num = Num, current_file_name = Left, current_offset = Offset }; @@ -1752,7 +1748,7 @@ load_messages(Left, [File|Files], total_size = TotalSize, is_persistent = IsPersistent }), {[Obj | VMAcc], - VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT + VTSAcc + TotalSize } end end, {[], 0}, Messages), @@ -1854,7 +1850,7 @@ recover_crashed_compactions1(Files, TmpFile) -> %% Remember the head of the list will be the highest entry %% in the file [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT, + TmpSize = TmpTopOffset + TmpTopTotalSize, ExpectedAbsPos = Top + TmpSize, {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}), %% and now extend the main file as big as necessary in a @@ -1890,7 +1886,7 @@ find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}; find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset} | Tail], ExpectedOffset, MsgIds) -> - ExpectedOffset1 = ExpectedOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT, + ExpectedOffset1 = ExpectedOffset + TotalSize, find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. @@ -1915,27 +1911,28 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> BodySize = size(MsgBody), MsgIdBin = term_to_binary(MsgId), MsgIdBinSize = size(MsgIdBin), - TotalSize = BodySize + MsgIdBinSize, + Size = BodySize + MsgIdBinSize, StopByte = case IsPersistent of true -> ?WRITE_OK_PERSISTENT; false -> ?WRITE_OK_TRANSIENT end, - case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS, + case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, MsgIdBin:MsgIdBinSize/binary, MsgBody:BodySize/binary, StopByte:?WRITE_OK_SIZE_BITS>>) of - ok -> {ok, TotalSize}; + ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO end. read_message_from_disk(FileHdl, TotalSize) -> - TotalSizeWriteOkBytes = TotalSize + 1, - case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of - {ok, <<TotalSize:?INTEGER_SIZE_BITS, + Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, + SizeWriteOkBytes = Size + 1, + case file:read(FileHdl, TotalSize) of + {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS, - Rest:TotalSizeWriteOkBytes/binary>>} -> - BodySize = TotalSize - MsgIdBinSize, + Rest:SizeWriteOkBytes/binary>>} -> + BodySize = Size - MsgIdBinSize, <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, StopByte:?WRITE_OK_SIZE_BITS>> = Rest, Persistent = case StopByte of @@ -1976,14 +1973,13 @@ read_next_file_entry(FileHdl, Offset) -> TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, case file:read(FileHdl, TwoIntegers) of {ok, - <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> - case {TotalSize, MsgIdBinSize} of + <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + case {Size, MsgIdBinSize} of {0, _} -> eof; %% Nothing we can do other than stop {_, 0} -> %% current message corrupted, try skipping past it - ExpectedAbsPos = - Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize, - case file:position(FileHdl, {cur, TotalSize + 1}) of + ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, + case file:position(FileHdl, {cur, Size + 1}) of {ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos}; {ok, _SomeOtherPos} -> eof; %% seek failed, so give up KO -> KO @@ -1991,10 +1987,10 @@ read_next_file_entry(FileHdl, Offset) -> {_, _} -> %% all good, let's continue case file:read(FileHdl, MsgIdBinSize) of {ok, <<MsgIdBin:MsgIdBinSize/binary>>} -> - ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + - TotalSize - 1, + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + ExpectedAbsPos = Offset + TotalSize - 1, case file:position( - FileHdl, {cur, TotalSize - MsgIdBinSize}) of + FileHdl, {cur, Size - MsgIdBinSize}) of {ok, ExpectedAbsPos} -> NextOffset = ExpectedAbsPos + 1, case read_stop_byte(FileHdl) of |
