diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 107 |
2 files changed, 80 insertions, 57 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c26221842e..b7ed868b28 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -245,12 +245,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> ok = detect_shutdown_state_and_adjust_delivered_flags(), - ok = add_index(), Store = rabbit_msg_store:init(Mode, base_directory(), FileSizeLimit, ReadFileHandlesLimit, - fun ref_count/1, EtsBPR), + fun msg_ref_gen/1, msg_ref_gen_init(), + EtsBPR), Store1 = prune(Store), - ok = del_index(), Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), @@ -870,21 +869,12 @@ mark_message_delivered(Key, N) -> end, mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). -add_index() -> - case mnesia:add_table_index(rabbit_disk_queue, msg_id) of - {atomic, ok} -> ok; - {aborted,{already_exists,rabbit_disk_queue,_}} -> ok; - E -> E - end. +msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue). -del_index() -> - case mnesia:del_table_index(rabbit_disk_queue, msg_id) of - {atomic, ok} -> ok; - %% hmm, something weird must be going on, but it's probably - %% not the end of the world - {aborted, {no_exists, rabbit_disk_queue,_}} -> ok; - E1 -> E1 - end. +msg_ref_gen('$end_of_table') -> finished; +msg_ref_gen(Key) -> + [Obj] = mnesia:dirty_read(rabbit_disk_queue, Key), + {Obj #dq_msg_loc.msg_id, 1, mnesia:dirty_next(rabbit_disk_queue, Key)}. prune_flush_batch(DeleteAcc, RemoveAcc, Store) -> lists:foldl(fun (Key, ok) -> @@ -1003,9 +993,3 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> 0 end, shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). - -ref_count(MsgId) -> - length(mnesia:dirty_index_match_object( - rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, _ = '_' }, - msg_id)). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d5959f9f38..b745acbf08 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -31,7 +31,7 @@ -module(rabbit_msg_store). --export([init/6, write/4, read/2, attrs/2, remove/2, release/2, +-export([init/7, write/4, read/2, attrs/2, remove/2, release/2, needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1, ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]). @@ -104,9 +104,10 @@ ets_bytes_per_record :: non_neg_integer() }). --spec(init/6 :: ('ram_disk' | 'disk_only', file_path(), +-spec(init/7 :: ('ram_disk' | 'disk_only', file_path(), non_neg_integer(), non_neg_integer(), - fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) -> + (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), + A, non_neg_integer()) -> msstate()). -spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()). -spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). @@ -241,8 +242,8 @@ %% from the store when it has been removed the same number of times. %% %% The reference counts do not persist. Therefore the initialisation -%% function must be provided with a function that determines the -%% initial reference count of any (recovered) message. +%% function must be provided with a generator that produces ref count +%% deltas for all recovered messages. %% %% Read messages with a reference count greater than one are entered %% into a message cache. The purpose of the cache is not especially @@ -256,8 +257,8 @@ %% public API %%---------------------------------------------------------------------------- -init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, - EtsBytesPerRecord) -> +init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, + MsgRefDeltaGen, MsgRefDeltaGenInit, EtsBytesPerRecord) -> file:delete(msg_location_dets_file(Dir)), @@ -298,17 +299,18 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun, message_cache = MessageCache, ets_bytes_per_record = EtsBytesPerRecord }, - + + ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), FileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), TmpFileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames), + ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames, State), %% There should be no more tmp files now, so go ahead and load the %% whole lot Files = [filename_to_num(FileName) || FileName <- FileNames], State1 = #msstate { current_file = CurFile, current_offset = Offset } = - load_messages(RefCountFun, Files, State), + load_messages(Files, State), %% read is only needed so that we can seek {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -666,26 +668,66 @@ dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). +dets_ets_select_delete(#msstate { msg_location_dets = MsgLocationDets, + operation_mode = disk_only }, MatchSpec) -> + dets:select_delete(MsgLocationDets, MatchSpec); +dets_ets_select_delete(#msstate { msg_location_ets = MsgLocationEts, + operation_mode = ram_disk }, MatchSpec) -> + ets:select_delete(MsgLocationEts, MatchSpec). + %%---------------------------------------------------------------------------- %% recovery %%---------------------------------------------------------------------------- -recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames) -> +count_msg_refs(Gen, Seed, State) -> + case Gen(Seed) of + finished -> ok; + {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State); + {MsgId, Delta, Next} -> + case dets_ets_lookup(State, MsgId) of + [] -> true = dets_ets_insert_new( + State, #msg_location { msg_id = MsgId, + ref_count = Delta }); + [StoreEntry = #msg_location { msg_id = MsgId, + ref_count = RefCount }] -> + NewRefCount = RefCount + Delta, + case NewRefCount of + 0 -> ok = dets_ets_delete(State, MsgId); + _ -> ok = dets_ets_insert( + State, StoreEntry #msg_location { + ref_count = NewRefCount }) + end + end, + count_msg_refs(Gen, Next, State) + end. + +verify_messages_referenced(State, MsgIds) -> + lists:foreach(fun (MsgId) -> [_] = dets_ets_lookup(State, MsgId) end, + MsgIds). + +prune_stale_refs(State) -> + MatchHead = #msg_location { file = undefined, _ = '_' }, + case dets_ets_select_delete(State, [{MatchHead, [], [true]}]) of + N when is_number(N) -> ok; + Other -> Other + end. + +recover_crashed_compactions(Dir, FileNames, TmpFileNames, State) -> lists:foreach(fun (TmpFileName) -> ok = recover_crashed_compactions1( - RefCountFun, Dir, FileNames, TmpFileName) + Dir, FileNames, TmpFileName, State) end, TmpFileNames), ok. -recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) -> +recover_crashed_compactions1(Dir, FileNames, TmpFileName, State) -> NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFileName, FileNames), {ok, UncorruptedMessagesTmp, MsgIdsTmp} = scan_file_for_valid_messages_msg_ids(Dir, TmpFileName), %% all of these messages should be referenced %% otherwise they wouldn't have been copied out - verify_messages_referenced(RefCountFun, MsgIdsTmp), + verify_messages_referenced(State, MsgIdsTmp), {ok, UncorruptedMessages, MsgIds} = scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName), %% 1) It's possible that everything in the tmp file is also in the @@ -740,7 +782,7 @@ recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) -> length(Rest))} end, %% Check that everything in the main file prefix is referenced - verify_messages_referenced(RefCountFun, MsgIds1), + verify_messages_referenced(State, MsgIds1), %% The main file prefix should be contiguous {Top, MsgIds1} = find_contiguous_block_prefix( lists:reverse(UncorruptedMessages1)), @@ -782,9 +824,6 @@ is_sublist(SmallerL, BiggerL) -> is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). -verify_messages_referenced(RefCountFun, MsgIds) -> - lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds). - scan_file_for_valid_messages_msg_ids(Dir, FileName) -> {ok, Messages} = scan_file_for_valid_messages(Dir, FileName), {ok, Messages, @@ -819,13 +858,14 @@ find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset} find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. -load_messages(RefCountFun, [], State) -> +load_messages([], State) -> CurFile = State #msstate.current_file, - load_messages(RefCountFun, undefined, [CurFile], State); -load_messages(RefCountFun, Files, State) -> - load_messages(RefCountFun, undefined, Files, State). + load_messages(undefined, [CurFile], State); +load_messages(Files, State) -> + load_messages(undefined, Files, State). -load_messages(_RefCountFun, Left, [], State) -> +load_messages(Left, [], State) -> + ok = prune_stale_refs(State), Offset = case sort_msg_locations_by_offset(desc, Left, State) of [] -> 0; @@ -834,20 +874,19 @@ load_messages(_RefCountFun, Left, [], State) -> MaxOffset + TotalSize end, State #msstate { current_file = Left, current_offset = Offset }; -load_messages(RefCountFun, Left, [File|Files], +load_messages(Left, [File|Files], State = #msstate { dir = Dir, file_summary = FileSummary }) -> {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case RefCountFun(MsgId) of - 0 -> {VMAcc, VTSAcc}; - RefCount -> - true = dets_ets_insert_new( - State, #msg_location { - msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset, - total_size = TotalSize, - attrs = Attrs }), + case dets_ets_lookup(State, MsgId) of + [] -> {VMAcc, VTSAcc}; + [StoreEntry] -> + ok = dets_ets_insert( + State, StoreEntry #msg_location { + file = File, offset = Offset, + total_size = TotalSize, + attrs = Attrs }), {[Obj | VMAcc], VTSAcc + TotalSize} end end, {[], 0}, Messages), @@ -862,7 +901,7 @@ load_messages(RefCountFun, Left, [File|Files], file = File, valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, left = Left, right = Right }), - load_messages(RefCountFun, File, Files, State). + load_messages(File, Files, State). %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation |
