summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl30
-rw-r--r--src/rabbit_msg_store.erl107
2 files changed, 80 insertions, 57 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c26221842e..b7ed868b28 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -245,12 +245,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = detect_shutdown_state_and_adjust_delivered_flags(),
- ok = add_index(),
Store = rabbit_msg_store:init(Mode, base_directory(),
FileSizeLimit, ReadFileHandlesLimit,
- fun ref_count/1, EtsBPR),
+ fun msg_ref_gen/1, msg_ref_gen_init(),
+ EtsBPR),
Store1 = prune(Store),
- ok = del_index(),
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
@@ -870,21 +869,12 @@ mark_message_delivered(Key, N) ->
end,
mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
-add_index() ->
- case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
- E -> E
- end.
+msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue).
-del_index() ->
- case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- %% hmm, something weird must be going on, but it's probably
- %% not the end of the world
- {aborted, {no_exists, rabbit_disk_queue,_}} -> ok;
- E1 -> E1
- end.
+msg_ref_gen('$end_of_table') -> finished;
+msg_ref_gen(Key) ->
+ [Obj] = mnesia:dirty_read(rabbit_disk_queue, Key),
+ {Obj #dq_msg_loc.msg_id, 1, mnesia:dirty_next(rabbit_disk_queue, Key)}.
prune_flush_batch(DeleteAcc, RemoveAcc, Store) ->
lists:foldl(fun (Key, ok) ->
@@ -1003,9 +993,3 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
0
end,
shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
-
-ref_count(MsgId) ->
- length(mnesia:dirty_index_match_object(
- rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId, _ = '_' },
- msg_id)).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d5959f9f38..b745acbf08 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_store).
--export([init/6, write/4, read/2, attrs/2, remove/2, release/2,
+-export([init/7, write/4, read/2, attrs/2, remove/2, release/2,
needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1,
ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]).
@@ -104,9 +104,10 @@
ets_bytes_per_record :: non_neg_integer()
}).
--spec(init/6 :: ('ram_disk' | 'disk_only', file_path(),
+-spec(init/7 :: ('ram_disk' | 'disk_only', file_path(),
non_neg_integer(), non_neg_integer(),
- fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) ->
+ (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})),
+ A, non_neg_integer()) ->
msstate()).
-spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()).
-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
@@ -241,8 +242,8 @@
%% from the store when it has been removed the same number of times.
%%
%% The reference counts do not persist. Therefore the initialisation
-%% function must be provided with a function that determines the
-%% initial reference count of any (recovered) message.
+%% function must be provided with a generator that produces ref count
+%% deltas for all recovered messages.
%%
%% Read messages with a reference count greater than one are entered
%% into a message cache. The purpose of the cache is not especially
@@ -256,8 +257,8 @@
%% public API
%%----------------------------------------------------------------------------
-init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
- EtsBytesPerRecord) ->
+init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit,
+ MsgRefDeltaGen, MsgRefDeltaGenInit, EtsBytesPerRecord) ->
file:delete(msg_location_dets_file(Dir)),
@@ -298,17 +299,18 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
message_cache = MessageCache,
ets_bytes_per_record = EtsBytesPerRecord
},
-
+
+ ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
FileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames),
+ ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames, State),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
Files = [filename_to_num(FileName) || FileName <- FileNames],
State1 = #msstate { current_file = CurFile, current_offset = Offset } =
- load_messages(RefCountFun, Files, State),
+ load_messages(Files, State),
%% read is only needed so that we can seek
{ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -666,26 +668,66 @@ dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
+dets_ets_select_delete(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, MatchSpec) ->
+ dets:select_delete(MsgLocationDets, MatchSpec);
+dets_ets_select_delete(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, MatchSpec) ->
+ ets:select_delete(MsgLocationEts, MatchSpec).
+
%%----------------------------------------------------------------------------
%% recovery
%%----------------------------------------------------------------------------
-recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames) ->
+count_msg_refs(Gen, Seed, State) ->
+ case Gen(Seed) of
+ finished -> ok;
+ {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State);
+ {MsgId, Delta, Next} ->
+ case dets_ets_lookup(State, MsgId) of
+ [] -> true = dets_ets_insert_new(
+ State, #msg_location { msg_id = MsgId,
+ ref_count = Delta });
+ [StoreEntry = #msg_location { msg_id = MsgId,
+ ref_count = RefCount }] ->
+ NewRefCount = RefCount + Delta,
+ case NewRefCount of
+ 0 -> ok = dets_ets_delete(State, MsgId);
+ _ -> ok = dets_ets_insert(
+ State, StoreEntry #msg_location {
+ ref_count = NewRefCount })
+ end
+ end,
+ count_msg_refs(Gen, Next, State)
+ end.
+
+verify_messages_referenced(State, MsgIds) ->
+ lists:foreach(fun (MsgId) -> [_] = dets_ets_lookup(State, MsgId) end,
+ MsgIds).
+
+prune_stale_refs(State) ->
+ MatchHead = #msg_location { file = undefined, _ = '_' },
+ case dets_ets_select_delete(State, [{MatchHead, [], [true]}]) of
+ N when is_number(N) -> ok;
+ Other -> Other
+ end.
+
+recover_crashed_compactions(Dir, FileNames, TmpFileNames, State) ->
lists:foreach(fun (TmpFileName) ->
ok = recover_crashed_compactions1(
- RefCountFun, Dir, FileNames, TmpFileName)
+ Dir, FileNames, TmpFileName, State)
end,
TmpFileNames),
ok.
-recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) ->
+recover_crashed_compactions1(Dir, FileNames, TmpFileName, State) ->
NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFileName, FileNames),
{ok, UncorruptedMessagesTmp, MsgIdsTmp} =
scan_file_for_valid_messages_msg_ids(Dir, TmpFileName),
%% all of these messages should be referenced
%% otherwise they wouldn't have been copied out
- verify_messages_referenced(RefCountFun, MsgIdsTmp),
+ verify_messages_referenced(State, MsgIdsTmp),
{ok, UncorruptedMessages, MsgIds} =
scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
@@ -740,7 +782,7 @@ recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) ->
length(Rest))}
end,
%% Check that everything in the main file prefix is referenced
- verify_messages_referenced(RefCountFun, MsgIds1),
+ verify_messages_referenced(State, MsgIds1),
%% The main file prefix should be contiguous
{Top, MsgIds1} = find_contiguous_block_prefix(
lists:reverse(UncorruptedMessages1)),
@@ -782,9 +824,6 @@ is_sublist(SmallerL, BiggerL) ->
is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-verify_messages_referenced(RefCountFun, MsgIds) ->
- lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds).
-
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, FileName),
{ok, Messages,
@@ -819,13 +858,14 @@ find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset}
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
-load_messages(RefCountFun, [], State) ->
+load_messages([], State) ->
CurFile = State #msstate.current_file,
- load_messages(RefCountFun, undefined, [CurFile], State);
-load_messages(RefCountFun, Files, State) ->
- load_messages(RefCountFun, undefined, Files, State).
+ load_messages(undefined, [CurFile], State);
+load_messages(Files, State) ->
+ load_messages(undefined, Files, State).
-load_messages(_RefCountFun, Left, [], State) ->
+load_messages(Left, [], State) ->
+ ok = prune_stale_refs(State),
Offset =
case sort_msg_locations_by_offset(desc, Left, State) of
[] -> 0;
@@ -834,20 +874,19 @@ load_messages(_RefCountFun, Left, [], State) ->
MaxOffset + TotalSize
end,
State #msstate { current_file = Left, current_offset = Offset };
-load_messages(RefCountFun, Left, [File|Files],
+load_messages(Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary }) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} = lists:foldl(
fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case RefCountFun(MsgId) of
- 0 -> {VMAcc, VTSAcc};
- RefCount ->
- true = dets_ets_insert_new(
- State, #msg_location {
- msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize,
- attrs = Attrs }),
+ case dets_ets_lookup(State, MsgId) of
+ [] -> {VMAcc, VTSAcc};
+ [StoreEntry] ->
+ ok = dets_ets_insert(
+ State, StoreEntry #msg_location {
+ file = File, offset = Offset,
+ total_size = TotalSize,
+ attrs = Attrs }),
{[Obj | VMAcc], VTSAcc + TotalSize}
end
end, {[], 0}, Messages),
@@ -862,7 +901,7 @@ load_messages(RefCountFun, Left, [File|Files],
file = File, valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
left = Left, right = Right }),
- load_messages(RefCountFun, File, Files, State).
+ load_messages(File, Files, State).
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation