summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-08-29 10:32:42 +0100
committerMatthias Radestock <matthias@lshift.net>2009-08-29 10:32:42 +0100
commite30eba75c782f5b6340fd7ebae835665cd336ea9 (patch)
tree0f6555f6c862e20e581e705ad11432735e47b8fe
parentef2d908ab3b26aea0e832aa7197c877d93ff52e7 (diff)
downloadrabbitmq-server-git-e30eba75c782f5b6340fd7ebae835665cd336ea9.tar.gz
banish ?FILE_PACKING_ADJUSTMENT from all but three functions
The details of the message packing are opaque to high level code, as they should be. The TotalSize that code sees now is the total size of the message on disk, including all packing adjustments, which is all that is ever needed to perform all the necessary file positioning etc at that level.
-rw-r--r--src/rabbit_disk_queue.erl62
1 files changed, 29 insertions, 33 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index d3bae0bb4c..4205dca590 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -923,7 +923,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
{offset, Offset},
{read, Rest}]}})
end,
- {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT, Res}
+ {Offset + TotalSize, Res}
end, State),
Message = #basic_message {} = bin_to_msg(MsgBody),
ok = if RefCount > 1 ->
@@ -1003,10 +1003,8 @@ remove_message(MsgId, Files,
ets:lookup(FileSummary, File),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
true =
- ets:insert(FileSummary,
- {File,
- (ValidTotalSize-TotalSize-?FILE_PACKING_ADJUSTMENT),
- ContiguousTop1, Left, Right}),
+ ets:insert(FileSummary, {File, ValidTotalSize - TotalSize,
+ ContiguousTop1, Left, Right}),
if CurName =:= File -> Files;
true -> sets:add_element(File, Files)
end;
@@ -1036,8 +1034,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
is_persistent = IsPersistent }),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
- ValidTotalSize1 = ValidTotalSize + TotalSize +
- ?FILE_PACKING_ADJUSTMENT,
+ ValidTotalSize1 = ValidTotalSize + TotalSize,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
%% can't be any holes in this file
ValidTotalSize1;
@@ -1045,7 +1042,7 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
end,
true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
ContiguousTop1, Left, undefined}),
- NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ NextOffset = CurOffset + TotalSize,
maybe_roll_to_new_file(
NextOffset, State #dqstate {current_offset = NextOffset,
current_dirty = true});
@@ -1437,19 +1434,18 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
%% update MsgLocationDets to reflect change of file and offset
ok = dets_ets_insert(State, StoreEntry #message_store_entry
{ file = Destination,
offset = CurOffset }),
- NextOffset = CurOffset + Size,
+ NextOffset = CurOffset + TotalSize,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
- {NextOffset, Offset, Offset + Size};
+ {NextOffset, Offset, Offset + TotalSize};
Offset =:= BlockEnd ->
%% extend the current block because the next
%% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
+ {NextOffset, BlockStart, BlockEnd + TotalSize};
true ->
%% found a gap, so actually do the work for
%% the previous block
@@ -1458,7 +1454,7 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
file:position(SourceHdl, BlockStart),
{ok, BSize} =
file:copy(SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + Size}
+ {NextOffset, Offset, Offset + TotalSize}
end
end, {InitOffset, undefined, undefined}, WorkList),
%% do the last remaining block
@@ -1729,7 +1725,7 @@ load_messages(Left, [], State) ->
offset = MaxOffset,
total_size = TotalSize} | _ ] =
sort_msg_locations_by_offset(desc, L),
- MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ MaxOffset + TotalSize
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
@@ -1752,7 +1748,7 @@ load_messages(Left, [File|Files],
total_size = TotalSize,
is_persistent = IsPersistent }),
{[Obj | VMAcc],
- VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ VTSAcc + TotalSize
}
end
end, {[], 0}, Messages),
@@ -1854,7 +1850,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% Remember the head of the list will be the highest entry
%% in the file
[{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
+ TmpSize = TmpTopOffset + TmpTopTotalSize,
ExpectedAbsPos = Top + TmpSize,
{ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
%% and now extend the main file as big as necessary in a
@@ -1890,7 +1886,7 @@ find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds};
find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset}
| Tail], ExpectedOffset, MsgIds) ->
- ExpectedOffset1 = ExpectedOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedOffset1 = ExpectedOffset + TotalSize,
find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
@@ -1915,27 +1911,28 @@ append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
BodySize = size(MsgBody),
MsgIdBin = term_to_binary(MsgId),
MsgIdBinSize = size(MsgIdBin),
- TotalSize = BodySize + MsgIdBinSize,
+ Size = BodySize + MsgIdBinSize,
StopByte = case IsPersistent of
true -> ?WRITE_OK_PERSISTENT;
false -> ?WRITE_OK_TRANSIENT
end,
- case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
+ case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
MsgBody:BodySize/binary,
StopByte:?WRITE_OK_SIZE_BITS>>) of
- ok -> {ok, TotalSize};
+ ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
read_message_from_disk(FileHdl, TotalSize) ->
- TotalSizeWriteOkBytes = TotalSize + 1,
- case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
- {ok, <<TotalSize:?INTEGER_SIZE_BITS,
+ Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
+ SizeWriteOkBytes = Size + 1,
+ case file:read(FileHdl, TotalSize) of
+ {ok, <<Size:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
- Rest:TotalSizeWriteOkBytes/binary>>} ->
- BodySize = TotalSize - MsgIdBinSize,
+ Rest:SizeWriteOkBytes/binary>>} ->
+ BodySize = Size - MsgIdBinSize,
<<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
StopByte:?WRITE_OK_SIZE_BITS>> = Rest,
Persistent = case StopByte of
@@ -1976,14 +1973,13 @@ read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
{ok,
- <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
- case {TotalSize, MsgIdBinSize} of
+ <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ case {Size, MsgIdBinSize} of
{0, _} -> eof; %% Nothing we can do other than stop
{_, 0} ->
%% current message corrupted, try skipping past it
- ExpectedAbsPos =
- Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
- case file:position(FileHdl, {cur, TotalSize + 1}) of
+ ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
+ case file:position(FileHdl, {cur, Size + 1}) of
{ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos};
{ok, _SomeOtherPos} -> eof; %% seek failed, so give up
KO -> KO
@@ -1991,10 +1987,10 @@ read_next_file_entry(FileHdl, Offset) ->
{_, _} -> %% all good, let's continue
case file:read(FileHdl, MsgIdBinSize) of
{ok, <<MsgIdBin:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT +
- TotalSize - 1,
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedAbsPos = Offset + TotalSize - 1,
case file:position(
- FileHdl, {cur, TotalSize - MsgIdBinSize}) of
+ FileHdl, {cur, Size - MsgIdBinSize}) of
{ok, ExpectedAbsPos} ->
NextOffset = ExpectedAbsPos + 1,
case read_stop_byte(FileHdl) of