summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl5
-rw-r--r--src/rabbit_msg_file.erl51
-rw-r--r--src/rabbit_msg_store.erl43
3 files changed, 40 insertions, 59 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 0f69f83b76..ee5fead7e4 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -439,13 +439,12 @@ remove_messages(Q, MsgSeqIds, State = #dqstate { store = Store } ) ->
Store1 = rabbit_msg_store:remove(MsgIds, Store),
{ok, State #dqstate { store = Store1 }}.
-internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
- guid = MsgId,
+internal_tx_publish(Message = #basic_message { guid = MsgId,
content = Content },
State = #dqstate { store = Store }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
Message1 = Message #basic_message { content = ClearedContent },
- Store1 = rabbit_msg_store:write(MsgId, Message1, IsPersistent, Store),
+ Store1 = rabbit_msg_store:write(MsgId, Message1, Store),
{ok, State #dqstate { store = Store1 }}.
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 46128612e7..94525d84f8 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_file).
--export([append/4, read/2, scan/1]).
+-export([append/3, read/2, scan/1]).
%%----------------------------------------------------------------------------
@@ -39,7 +39,7 @@
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(WRITE_OK_SIZE_BITS, 8).
-define(WRITE_OK_MARKER, 255).
--define(FILE_PACKING_ADJUSTMENT, (1 + (3 * (?INTEGER_SIZE_BYTES)))).
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2 * (?INTEGER_SIZE_BYTES)))).
%%----------------------------------------------------------------------------
@@ -48,32 +48,27 @@
-type(io_device() :: any()).
-type(msg_id() :: binary()).
-type(msg() :: any()).
--type(msg_attrs() :: any()).
-type(position() :: non_neg_integer()).
-type(msg_size() :: non_neg_integer()).
--spec(append/4 :: (io_device(), msg_id(), msg(), msg_attrs()) ->
+-spec(append/3 :: (io_device(), msg_id(), msg()) ->
({'ok', msg_size()} | {'error', any()})).
-spec(read/2 :: (io_device(), msg_size()) ->
- ({'ok', {msg_id(), msg(), msg_attrs()}} | {'error', any()})).
+ ({'ok', {msg_id(), msg()}} | {'error', any()})).
-spec(scan/1 :: (io_device()) ->
- {'ok', [{msg_id(), msg_attrs(), msg_size(), position()}]}).
+ {'ok', [{msg_id(), msg_size(), position()}]}).
-endif.
%%----------------------------------------------------------------------------
-append(FileHdl, MsgId, MsgBody, MsgAttrs) when is_binary(MsgId) ->
+append(FileHdl, MsgId, MsgBody) when is_binary(MsgId) ->
MsgBodyBin = term_to_binary(MsgBody),
- MsgAttrsBin = term_to_binary(MsgAttrs),
- [MsgIdSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes =
- [size(B) || B <- [MsgId, MsgBodyBin, MsgAttrsBin]],
+ [MsgIdSize, MsgBodyBinSize] = Sizes = [size(B) || B <- [MsgId, MsgBodyBin]],
Size = lists:sum(Sizes),
case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
MsgIdSize:?INTEGER_SIZE_BITS,
- MsgAttrsBinSize:?INTEGER_SIZE_BITS,
MsgId:MsgIdSize/binary,
- MsgAttrsBin:MsgAttrsBinSize/binary,
MsgBodyBin:MsgBodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
@@ -86,15 +81,12 @@ read(FileHdl, TotalSize) ->
case file:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
MsgIdSize:?INTEGER_SIZE_BITS,
- MsgAttrsBinSize:?INTEGER_SIZE_BITS,
Rest:SizeWriteOkBytes/binary>>} ->
- BodyBinSize = Size - MsgIdSize - MsgAttrsBinSize,
+ BodyBinSize = Size - MsgIdSize,
<<MsgId:MsgIdSize/binary,
- MsgAttrsBin:MsgAttrsBinSize/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest,
- {ok, {MsgId,
- binary_to_term(MsgBodyBin), binary_to_term(MsgAttrsBin)}};
+ {ok, {MsgId, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
@@ -105,23 +97,19 @@ scan(FileHdl, Offset, Acc) ->
eof -> {ok, Acc};
{corrupted, NextOffset} ->
scan(FileHdl, NextOffset, Acc);
- {ok, {MsgId, MsgAttrs, TotalSize, NextOffset}} ->
- scan(FileHdl, NextOffset,
- [{MsgId, MsgAttrs, TotalSize, Offset} | Acc]);
+ {ok, {MsgId, TotalSize, NextOffset}} ->
+ scan(FileHdl, NextOffset, [{MsgId, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
{ok, Acc}
end.
read_next(FileHdl, Offset) ->
- ThreeIntegers = 3 * ?INTEGER_SIZE_BYTES,
- case file:read(FileHdl, ThreeIntegers) of
- {ok,
- <<Size:?INTEGER_SIZE_BITS,
- MsgIdSize:?INTEGER_SIZE_BITS,
- MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} ->
+ TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
+ case file:read(FileHdl, TwoIntegers) of
+ {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdSize:?INTEGER_SIZE_BITS>>} ->
if Size == 0 -> eof; %% Nothing we can do other than stop
- MsgIdSize == 0 orelse MsgAttrsBinSize == 0 ->
+ MsgIdSize == 0 ->
%% current message corrupted, try skipping past it
ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
case file:position(FileHdl, {cur, Size + 1}) of
@@ -130,21 +118,18 @@ read_next(FileHdl, Offset) ->
KO -> KO
end;
true -> %% all good, let's continue
- HeaderSize = MsgIdSize + MsgAttrsBinSize,
- case file:read(FileHdl, HeaderSize) of
- {ok, <<MsgId:MsgIdSize/binary,
- MsgAttrsBin:MsgAttrsBinSize/binary>>} ->
+ case file:read(FileHdl, MsgIdSize) of
+ {ok, <<MsgId:MsgIdSize/binary>>} ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Offset + TotalSize - 1,
case file:position(
- FileHdl, {cur, Size - HeaderSize}) of
+ FileHdl, {cur, Size - MsgIdSize}) of
{ok, ExpectedAbsPos} ->
NextOffset = ExpectedAbsPos + 1,
case file:read(FileHdl, 1) of
{ok, <<?WRITE_OK_MARKER:
?WRITE_OK_SIZE_BITS>>} ->
{ok, {MsgId,
- binary_to_term(MsgAttrsBin),
TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{corrupted, NextOffset};
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b752b9f6f4..66bfb719f8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_store).
--export([init/5, write/4, read/2, contains/2, remove/2, release/2,
+-export([init/5, write/3, read/2, contains/2, remove/2, release/2,
needs_sync/2, sync/1, cleanup/1]).
%%----------------------------------------------------------------------------
@@ -52,7 +52,7 @@
}).
-record(msg_location,
- {msg_id, ref_count, file, offset, total_size, attrs}).
+ {msg_id, ref_count, file, offset, total_size}).
-record(file_summary,
{file, valid_total_size, contiguous_top, left, right}).
@@ -75,7 +75,6 @@
-type(ets_table() :: any()).
-type(msg_id() :: binary()).
-type(msg() :: any()).
--type(msg_attrs() :: any()).
-type(file_path() :: any()).
-type(io_device() :: any()).
@@ -97,7 +96,7 @@
non_neg_integer(), non_neg_integer(),
(fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})),
A) -> msstate()).
--spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()).
+-spec(write/3 :: (msg_id(), msg(), msstate()) -> msstate()).
-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
-spec(contains/2 :: (msg_id(), msstate()) -> boolean()).
-spec(remove/2 :: ([msg_id()], msstate()) -> msstate()).
@@ -113,7 +112,7 @@
%% The components:
%%
%% MsgLocation: this is an ets table which contains:
-%% {MsgId, RefCount, File, Offset, TotalSize, Attrs}
+%% {MsgId, RefCount, File, Offset, TotalSize}
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
%%
@@ -277,19 +276,18 @@ init(Dir, FileSizeLimit, ReadFileHandlesLimit,
State1 #msstate { current_file_handle = FileHdl }.
-write(MsgId, Msg, Attrs,
- State = #msstate { current_file_handle = CurHdl,
- current_file = CurFile,
- current_offset = CurOffset,
- file_summary = FileSummary }) ->
+write(MsgId, Msg, State = #msstate { current_file_handle = CurHdl,
+ current_file = CurFile,
+ current_offset = CurOffset,
+ file_summary = FileSummary }) ->
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
msg_id = MsgId, ref_count = 1, file = CurFile,
- offset = CurOffset, total_size = TotalSize,
- attrs = Attrs }, State),
+ offset = CurOffset, total_size = TotalSize },
+ State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined }] =
@@ -323,13 +321,13 @@ read(MsgId, State) ->
total_size = TotalSize } ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {{ok, {MsgId, Msg, _Attrs}}, State1} =
+ {{ok, {MsgId, Msg}}, State1} =
with_read_handle_at(
File, Offset,
fun(Hdl) ->
Res = case rabbit_msg_file:read(
Hdl, TotalSize) of
- {ok, {MsgId, _, _}} = Obj -> Obj;
+ {ok, {MsgId, _}} = Obj -> Obj;
{ok, Rest} ->
throw({error,
{misread,
@@ -654,7 +652,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
%% file.
- [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
+ [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
TmpSize = TmpTopOffset + TmpTopTotalSize,
%% Extend the main file as big as necessary in a single
%% move. If we run out of disk space, this truncate could
@@ -685,8 +683,7 @@ is_disjoint(SmallerL, BiggerL) ->
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, FileName),
- {ok, Messages,
- [MsgId || {MsgId, _Attrs, _TotalSize, _FileOffset} <- Messages]}.
+ {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
@@ -710,8 +707,8 @@ find_contiguous_block_prefix(List) ->
find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds};
-find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset}
- | Tail], ExpectedOffset, MsgIds) ->
+find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail],
+ ExpectedOffset, MsgIds) ->
ExpectedOffset1 = ExpectedOffset + TotalSize,
find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
@@ -738,15 +735,15 @@ build_index(Left, [File|Files], FilesToCompact,
{ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize, AllValid} =
lists:foldl(
- fun (Obj = {MsgId, Attrs, TotalSize, Offset},
+ fun (Obj = {MsgId, TotalSize, Offset},
{VMAcc, VTSAcc, AVAcc}) ->
case index_lookup(MsgId, State) of
not_found -> {VMAcc, VTSAcc, false};
StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
- total_size = TotalSize,
- attrs = Attrs }, State),
+ total_size = TotalSize },
+ State),
{[Obj | VMAcc], VTSAcc + TotalSize, AVAcc}
end
end, {[], 0, Messages =/= []}, Messages),