diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 36 |
2 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 6beccf3a87..b786f03659 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -544,7 +544,7 @@ internal_fetch_attributes(Q, MarkDelivered, Advance, case next(Q, MarkDelivered, Advance, State) of empty -> empty; {MsgId, IsDelivered, AckTag, Remaining} -> - IsPersistent = rabbit_msg_store:is_persistent(MsgId, Store), + IsPersistent = rabbit_msg_store:attrs(MsgId, Store), {MsgId, IsPersistent, IsDelivered, AckTag, Remaining} end. @@ -902,7 +902,7 @@ prune_mnesia(Store, Key, DeleteAcc, RemoveAcc, Len) -> [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = mnesia:dirty_read(rabbit_disk_queue, Key), {DeleteAcc1, RemoveAcc1, Len1} = - case rabbit_msg_store:is_persistent(MsgId, Store) of + case rabbit_msg_store:attrs(MsgId, Store) of not_found -> %% msg hasn't been found on disk, delete it {[{Q, SeqId} | DeleteAcc], RemoveAcc, Len + 1}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 427a669549..a1e9e17a1a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_store). --export([init/6, write/4, read/2, is_persistent/2, remove/2, release/2, +-export([init/6, write/4, read/2, attrs/2, remove/2, release/2, needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1, ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]). @@ -57,7 +57,7 @@ }). -record(msg_location, - {msg_id, ref_count, file, offset, total_size, is_persistent}). + {msg_id, ref_count, file, offset, total_size, attrs}). -record(file_summary, {file, valid_total_size, contiguous_top, left, right}). @@ -84,6 +84,7 @@ -type(ets_table() :: any()). -type(msg_id() :: any()). -type(msg() :: any()). +-type(msg_attrs() :: any()). -type(file_path() :: any()). -type(io_device() :: any()). @@ -109,9 +110,9 @@ non_neg_integer(), non_neg_integer(), fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) -> msstate()). --spec(write/4 :: (msg_id(), msg(), boolean(), msstate()) -> msstate()). +-spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()). -spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). --spec(is_persistent/2 :: (msg_id(), msstate()) -> boolean() | 'not_found'). +-spec(attrs/2 :: (msg_id(), msstate()) -> msg_attrs() | 'not_found'). -spec(remove/2 :: ([msg_id()], msstate()) -> msstate()). -spec(release/2 :: ([msg_id()], msstate()) -> msstate()). -spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()). @@ -130,7 +131,7 @@ %% The components: %% %% MsgLocation: this is a (d)ets table which contains: -%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent} +%% {MsgId, RefCount, File, Offset, TotalSize, Attrs} %% FileSummary: this is an ets table which contains: %% {File, ValidTotalSize, ContiguousTop, Left, Right} %% @@ -317,7 +318,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, State1 #msstate { current_file_handle = FileHdl }. -write(MsgId, Msg, IsPersistent, +write(MsgId, Msg, Attrs, State = #msstate { current_file_handle = CurHdl, current_file_name = CurName, current_offset = CurOffset, @@ -325,13 +326,12 @@ write(MsgId, Msg, IsPersistent, case dets_ets_lookup(State, MsgId) of [] -> %% New message, lots to do - {ok, TotalSize} = rabbit_msg_file:append( - CurHdl, MsgId, Msg, IsPersistent), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs), true = dets_ets_insert_new( State, #msg_location { msg_id = MsgId, ref_count = 1, file = CurName, offset = CurOffset, total_size = TotalSize, - is_persistent = IsPersistent }), + attrs = Attrs }), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined }] = @@ -368,7 +368,7 @@ read(MsgId, State) -> total_size = TotalSize }] -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {{ok, {MsgId, Msg, _IsPersistent}}, State1} = + {{ok, {MsgId, Msg, _Attrs}}, State1} = with_read_handle_at( File, Offset, fun(Hdl) -> @@ -399,13 +399,11 @@ read(MsgId, State) -> end end. -is_persistent(MsgId, State) -> +attrs(MsgId, State) -> Objs = dets_ets_lookup(State, MsgId), case Objs of - [] -> - not_found; - [#msg_location { msg_id = MsgId, is_persistent = IsPersistent }] -> - IsPersistent + [] -> not_found; + [#msg_location { msg_id = MsgId, attrs = Attrs }] -> Attrs end. remove(MsgIds, State = #msstate { current_file_name = CurName }) -> @@ -790,7 +788,7 @@ verify_messages_referenced(RefCountFun, MsgIds) -> scan_file_for_valid_messages_msg_ids(Dir, File) -> {ok, Messages} = scan_file_for_valid_messages(Dir, File), {ok, Messages, - [MsgId || {MsgId, _IsPersistent, _TotalSize, _FileOffset} <- Messages]}. + [MsgId || {MsgId, _Attrs, _TotalSize, _FileOffset} <- Messages]}. scan_file_for_valid_messages(Dir, File) -> case open_file(Dir, File, ?READ_MODE) of @@ -813,7 +811,7 @@ find_contiguous_block_prefix(List) -> find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}; -find_contiguous_block_prefix([{MsgId, _IsPersistent, TotalSize, ExpectedOffset} +find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset} | Tail], ExpectedOffset, MsgIds) -> ExpectedOffset1 = ExpectedOffset + TotalSize, find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); @@ -845,7 +843,7 @@ load_messages(RefCountFun, Left, [File|Files], State = #msstate { dir = Dir, file_summary = FileSummary }) -> {ok, Messages} = scan_file_for_valid_messages(Dir, File), {ValidMessages, ValidTotalSize} = lists:foldl( - fun (Obj = {MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) -> case RefCountFun(MsgId) of 0 -> {VMAcc, VTSAcc}; RefCount -> @@ -854,7 +852,7 @@ load_messages(RefCountFun, Left, [File|Files], msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize, - is_persistent = IsPersistent }), + attrs = Attrs }), {[Obj | VMAcc], VTSAcc + TotalSize} end end, {[], 0}, Messages), |
