diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 43 |
3 files changed, 40 insertions, 59 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 0f69f83b76..ee5fead7e4 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -439,13 +439,12 @@ remove_messages(Q, MsgSeqIds, State = #dqstate { store = Store } ) -> Store1 = rabbit_msg_store:remove(MsgIds, Store), {ok, State #dqstate { store = Store1 }}. -internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, - guid = MsgId, +internal_tx_publish(Message = #basic_message { guid = MsgId, content = Content }, State = #dqstate { store = Store }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), Message1 = Message #basic_message { content = ClearedContent }, - Store1 = rabbit_msg_store:write(MsgId, Message1, IsPersistent, Store), + Store1 = rabbit_msg_store:write(MsgId, Message1, Store), {ok, State #dqstate { store = Store1 }}. internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 46128612e7..94525d84f8 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_file). --export([append/4, read/2, scan/1]). +-export([append/3, read/2, scan/1]). %%---------------------------------------------------------------------------- @@ -39,7 +39,7 @@ -define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(WRITE_OK_SIZE_BITS, 8). -define(WRITE_OK_MARKER, 255). --define(FILE_PACKING_ADJUSTMENT, (1 + (3 * (?INTEGER_SIZE_BYTES)))). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2 * (?INTEGER_SIZE_BYTES)))). %%---------------------------------------------------------------------------- @@ -48,32 +48,27 @@ -type(io_device() :: any()). -type(msg_id() :: binary()). -type(msg() :: any()). --type(msg_attrs() :: any()). -type(position() :: non_neg_integer()). -type(msg_size() :: non_neg_integer()). --spec(append/4 :: (io_device(), msg_id(), msg(), msg_attrs()) -> +-spec(append/3 :: (io_device(), msg_id(), msg()) -> ({'ok', msg_size()} | {'error', any()})). -spec(read/2 :: (io_device(), msg_size()) -> - ({'ok', {msg_id(), msg(), msg_attrs()}} | {'error', any()})). + ({'ok', {msg_id(), msg()}} | {'error', any()})). -spec(scan/1 :: (io_device()) -> - {'ok', [{msg_id(), msg_attrs(), msg_size(), position()}]}). + {'ok', [{msg_id(), msg_size(), position()}]}). -endif. %%---------------------------------------------------------------------------- -append(FileHdl, MsgId, MsgBody, MsgAttrs) when is_binary(MsgId) -> +append(FileHdl, MsgId, MsgBody) when is_binary(MsgId) -> MsgBodyBin = term_to_binary(MsgBody), - MsgAttrsBin = term_to_binary(MsgAttrs), - [MsgIdSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes = - [size(B) || B <- [MsgId, MsgBodyBin, MsgAttrsBin]], + [MsgIdSize, MsgBodyBinSize] = Sizes = [size(B) || B <- [MsgId, MsgBodyBin]], Size = lists:sum(Sizes), case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, MsgIdSize:?INTEGER_SIZE_BITS, - MsgAttrsBinSize:?INTEGER_SIZE_BITS, MsgId:MsgIdSize/binary, - MsgAttrsBin:MsgAttrsBinSize/binary, MsgBodyBin:MsgBodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; @@ -86,15 +81,12 @@ read(FileHdl, TotalSize) -> case file:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdSize:?INTEGER_SIZE_BITS, - MsgAttrsBinSize:?INTEGER_SIZE_BITS, Rest:SizeWriteOkBytes/binary>>} -> - BodyBinSize = Size - MsgIdSize - MsgAttrsBinSize, + BodyBinSize = Size - MsgIdSize, <<MsgId:MsgIdSize/binary, - MsgAttrsBin:MsgAttrsBinSize/binary, MsgBodyBin:BodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest, - {ok, {MsgId, - binary_to_term(MsgBodyBin), binary_to_term(MsgAttrsBin)}}; + {ok, {MsgId, binary_to_term(MsgBodyBin)}}; KO -> KO end. @@ -105,23 +97,19 @@ scan(FileHdl, Offset, Acc) -> eof -> {ok, Acc}; {corrupted, NextOffset} -> scan(FileHdl, NextOffset, Acc); - {ok, {MsgId, MsgAttrs, TotalSize, NextOffset}} -> - scan(FileHdl, NextOffset, - [{MsgId, MsgAttrs, TotalSize, Offset} | Acc]); + {ok, {MsgId, TotalSize, NextOffset}} -> + scan(FileHdl, NextOffset, [{MsgId, TotalSize, Offset} | Acc]); _KO -> %% bad message, but we may still have recovered some valid messages {ok, Acc} end. read_next(FileHdl, Offset) -> - ThreeIntegers = 3 * ?INTEGER_SIZE_BYTES, - case file:read(FileHdl, ThreeIntegers) of - {ok, - <<Size:?INTEGER_SIZE_BITS, - MsgIdSize:?INTEGER_SIZE_BITS, - MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} -> + TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, + case file:read(FileHdl, TwoIntegers) of + {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdSize:?INTEGER_SIZE_BITS>>} -> if Size == 0 -> eof; %% Nothing we can do other than stop - MsgIdSize == 0 orelse MsgAttrsBinSize == 0 -> + MsgIdSize == 0 -> %% current message corrupted, try skipping past it ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, case file:position(FileHdl, {cur, Size + 1}) of @@ -130,21 +118,18 @@ read_next(FileHdl, Offset) -> KO -> KO end; true -> %% all good, let's continue - HeaderSize = MsgIdSize + MsgAttrsBinSize, - case file:read(FileHdl, HeaderSize) of - {ok, <<MsgId:MsgIdSize/binary, - MsgAttrsBin:MsgAttrsBinSize/binary>>} -> + case file:read(FileHdl, MsgIdSize) of + {ok, <<MsgId:MsgIdSize/binary>>} -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Offset + TotalSize - 1, case file:position( - FileHdl, {cur, Size - HeaderSize}) of + FileHdl, {cur, Size - MsgIdSize}) of {ok, ExpectedAbsPos} -> NextOffset = ExpectedAbsPos + 1, case file:read(FileHdl, 1) of {ok, <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> {ok, {MsgId, - binary_to_term(MsgAttrsBin), TotalSize, NextOffset}}; {ok, _SomeOtherData} -> {corrupted, NextOffset}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b752b9f6f4..66bfb719f8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_store). --export([init/5, write/4, read/2, contains/2, remove/2, release/2, +-export([init/5, write/3, read/2, contains/2, remove/2, release/2, needs_sync/2, sync/1, cleanup/1]). %%---------------------------------------------------------------------------- @@ -52,7 +52,7 @@ }). -record(msg_location, - {msg_id, ref_count, file, offset, total_size, attrs}). + {msg_id, ref_count, file, offset, total_size}). -record(file_summary, {file, valid_total_size, contiguous_top, left, right}). @@ -75,7 +75,6 @@ -type(ets_table() :: any()). -type(msg_id() :: binary()). -type(msg() :: any()). --type(msg_attrs() :: any()). -type(file_path() :: any()). -type(io_device() :: any()). @@ -97,7 +96,7 @@ non_neg_integer(), non_neg_integer(), (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> msstate()). --spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()). +-spec(write/3 :: (msg_id(), msg(), msstate()) -> msstate()). -spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). -spec(contains/2 :: (msg_id(), msstate()) -> boolean()). -spec(remove/2 :: ([msg_id()], msstate()) -> msstate()). @@ -113,7 +112,7 @@ %% The components: %% %% MsgLocation: this is an ets table which contains: -%% {MsgId, RefCount, File, Offset, TotalSize, Attrs} +%% {MsgId, RefCount, File, Offset, TotalSize} %% FileSummary: this is an ets table which contains: %% {File, ValidTotalSize, ContiguousTop, Left, Right} %% @@ -277,19 +276,18 @@ init(Dir, FileSizeLimit, ReadFileHandlesLimit, State1 #msstate { current_file_handle = FileHdl }. -write(MsgId, Msg, Attrs, - State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - current_offset = CurOffset, - file_summary = FileSummary }) -> +write(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + current_offset = CurOffset, + file_summary = FileSummary }) -> case index_lookup(MsgId, State) of not_found -> %% New message, lots to do - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, - offset = CurOffset, total_size = TotalSize, - attrs = Attrs }, State), + offset = CurOffset, total_size = TotalSize }, + State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined }] = @@ -323,13 +321,13 @@ read(MsgId, State) -> total_size = TotalSize } -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {{ok, {MsgId, Msg, _Attrs}}, State1} = + {{ok, {MsgId, Msg}}, State1} = with_read_handle_at( File, Offset, fun(Hdl) -> Res = case rabbit_msg_file:read( Hdl, TotalSize) of - {ok, {MsgId, _, _}} = Obj -> Obj; + {ok, {MsgId, _}} = Obj -> Obj; {ok, Rest} -> throw({error, {misread, @@ -654,7 +652,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% Wipe out any rubbish at the end of the file. Remember %% the head of the list will be the highest entry in the %% file. - [{_, _, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, + [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, TmpSize = TmpTopOffset + TmpTopTotalSize, %% Extend the main file as big as necessary in a single %% move. If we run out of disk space, this truncate could @@ -685,8 +683,7 @@ is_disjoint(SmallerL, BiggerL) -> scan_file_for_valid_messages_msg_ids(Dir, FileName) -> {ok, Messages} = scan_file_for_valid_messages(Dir, FileName), - {ok, Messages, - [MsgId || {MsgId, _Attrs, _TotalSize, _FileOffset} <- Messages]}. + {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of @@ -710,8 +707,8 @@ find_contiguous_block_prefix(List) -> find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}; -find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset} - | Tail], ExpectedOffset, MsgIds) -> +find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail], + ExpectedOffset, MsgIds) -> ExpectedOffset1 = ExpectedOffset + TotalSize, find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> @@ -738,15 +735,15 @@ build_index(Left, [File|Files], FilesToCompact, {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize, AllValid} = lists:foldl( - fun (Obj = {MsgId, Attrs, TotalSize, Offset}, + fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc, AVAcc}) -> case index_lookup(MsgId, State) of not_found -> {VMAcc, VTSAcc, false}; StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, - total_size = TotalSize, - attrs = Attrs }, State), + total_size = TotalSize }, + State), {[Obj | VMAcc], VTSAcc + TotalSize, AVAcc} end end, {[], 0, Messages =/= []}, Messages), |
