summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl62
1 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index d3bae0bb4c..4205dca590 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -923,7 +923,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
{offset, Offset},
{read, Rest}]}})
end,
- {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res}
+ {Offset + TotalSize, Res}
end, State),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
@@ -1003,10 +1003,8 @@ remove_message(MsgId, Files,
ets:lookup(FileSummary, File),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
true =
- ets:insert(FileSummary,
- {File,
- (ValidTotalSize-TotalSize-?FILE_PACKING_ADJUSTMENT),
- ContiguousTop1, Left, Right}),
+ ets:insert(FileSummary, {File, ValidTotalSize - TotalSize,
+ ContiguousTop1, Left, Right}),
if CurName =:= File -> Files;
true -> sets:add_element(File, Files)
end;
@@ -1036,8 +1034,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
is_persistent = IsPersistent }),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
- ValidTotalSize1 = ValidTotalSize + TotalSize +
- ?FILE_PACKING_ADJUSTMENT,
+ ValidTotalSize1 = ValidTotalSize + TotalSize,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
%% can't be any holes in this file
ValidTotalSize1;
@@ -1045,7 +1042,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
end,
true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
ContiguousTop1, Left, undefined}),
- NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ NextOffset = CurOffset + TotalSize,
maybe_roll_to_new_file(
NextOffset, State #dqstate {current_offset = NextOffset,
current_dirty = true});
@@ -1437,19 +1434,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
%% update MsgLocationDets to reflect change of file and offset
ok = dets_ets_insert(State, StoreEntry #message_store_entry
{ file = Destination,
offset = CurOffset }),
- NextOffset = CurOffset + Size,
+ NextOffset = CurOffset + TotalSize,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
- {NextOffset, Offset, Offset + Size};
+ {NextOffset, Offset, Offset + TotalSize};
Offset =:= BlockEnd ->
%% extend the current block because the next
%% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
+ {NextOffset, BlockStart, BlockEnd + TotalSize};
true ->
%% found a gap, so actually do the work for
%% the previous block
@@ -1458,7 +1454,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
file:position(SourceHdl, BlockStart),
{ok, BSize} =
file:copy(SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + Size}
+ {NextOffset, Offset, Offset + TotalSize}
end
end, {InitOffset, undefined, undefined}, WorkList),
%% do the last remaining block
@@ -1729,7 +1725,7 @@ load_messages(Left, [], State) ->
offset = MaxOffset,
total_size = TotalSize} | _ ] =
sort_msg_locations_by_offset(desc, L),
- MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ MaxOffset + TotalSize
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
@@ -1752,7 +1748,7 @@ load_messages(Left, [File|Files],
total_size = TotalSize,
is_persistent = IsPersistent }),
{[Obj | VMAcc],
- VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ VTSAcc + TotalSize
}
end
end, {[], 0}, Messages),
@@ -1854,7 +1850,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% Remember the head of the list will be the highest entry
%% in the file
[{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
+ TmpSize = TmpTopOffset + TmpTopTotalSize,
ExpectedAbsPos = Top + TmpSize,
{ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
%% and now extend the main file as big as necessary in a
@@ -1890,7 +1886,7 @@ find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds};
find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset}
| Tail], ExpectedOffset, MsgIds) ->
- ExpectedOffset1 = ExpectedOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedOffset1 = ExpectedOffset + TotalSize,
find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
@@ -1915,27 +1911,28 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
BodySize = size(MsgBody),
MsgIdBin = term_to_binary(MsgId),
MsgIdBinSize = size(MsgIdBin),
- TotalSize = BodySize + MsgIdBinSize,
+ Size = BodySize + MsgIdBinSize,
StopByte = case IsPersistent of
true -> ?WRITE_OK_PERSISTENT;
false -> ?WRITE_OK_TRANSIENT
end,
- case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
+ case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
MsgBody:BodySize/binary,
StopByte:?WRITE_OK_SIZE_BITS>>) of
- ok -> {ok, TotalSize};
+ ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
read_message_from_disk(FileHdl, TotalSize) ->
- TotalSizeWriteOkBytes = TotalSize + 1,
- case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
- {ok, <<TotalSize:?INTEGER_SIZE_BITS,
+ Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
+ SizeWriteOkBytes = Size + 1,
+ case file:read(FileHdl, TotalSize) of
+ {ok, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
- Rest:TotalSizeWriteOkBytes/binary>>} ->
- BodySize = TotalSize - MsgIdBinSize,
+ Rest:SizeWriteOkBytes/binary>>} ->
+ BodySize = Size - MsgIdBinSize,
<<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
StopByte:?WRITE_OK_SIZE_BITS>> = Rest,
Persistent = case StopByte of
@@ -1976,14 +1973,13 @@ read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
{ok,
- <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
- case {TotalSize, MsgIdBinSize} of
+ <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ case {Size, MsgIdBinSize} of
{0, _} -> eof; %% Nothing we can do other than stop
{_, 0} ->
%% current message corrupted, try skipping past it
- ExpectedAbsPos =
- Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
- case file:position(FileHdl, {cur, TotalSize + 1}) of
+ ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
+ case file:position(FileHdl, {cur, Size + 1}) of
{ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos};
{ok, _SomeOtherPos} -> eof; %% seek failed, so give up
KO -> KO
@@ -1991,10 +1987,10 @@ read_next_file_entry(FileHdl, Offset) ->
{_, _} -> %% all good, let's continue
case file:read(FileHdl, MsgIdBinSize) of
{ok, <<MsgIdBin:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT +
- TotalSize - 1,
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedAbsPos = Offset + TotalSize - 1,
case file:position(
- FileHdl, {cur, TotalSize - MsgIdBinSize}) of
+ FileHdl, {cur, Size - MsgIdBinSize}) of
{ok, ExpectedAbsPos} ->
NextOffset = ExpectedAbsPos + 1,
case read_stop_byte(FileHdl) of