summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-10 00:04:14 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-10 00:04:14 +0100
commiteeb480d1e5d8fec0d98196f4332d115fa5c98c2a (patch)
tree02c870423aa1dd4cddd2f51341dc768f8b58f724
parentcd6aed9e02dd24c59a4ddd62184ee7b0e290622c (diff)
downloadrabbitmq-server-git-eeb480d1e5d8fec0d98196f4332d115fa5c98c2a.tar.gz
initialise msg_store ref counts via a generator
This is more flexible than the previous ref_count function, allowing the ref counts to be obtained w/o consuming any memory at the supplying end in a variety of scenarios. We use the dq_msg_loc ets/dets table to store the ref counts. That table is later updated with the full details of the messages (their file and position, etc). At the end we prune any entries that have a ref count but no associated file - i.e. the referenced message couldn't be found on disk. This change should also fix the "All replicas on diskfull nodes are not active yet" error observed in bug 21530 since we no longer need the indices on the rabbit_disk_queue mnesia table which we identified as the most likely cause of that error.
-rw-r--r--src/rabbit_disk_queue.erl30
-rw-r--r--src/rabbit_msg_store.erl107
2 files changed, 80 insertions, 57 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c26221842e..b7ed868b28 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -245,12 +245,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
ok = detect_shutdown_state_and_adjust_delivered_flags(),
- ok = add_index(),
Store = rabbit_msg_store:init(Mode, base_directory(),
FileSizeLimit, ReadFileHandlesLimit,
- fun ref_count/1, EtsBPR),
+ fun msg_ref_gen/1, msg_ref_gen_init(),
+ EtsBPR),
Store1 = prune(Store),
- ok = del_index(),
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
@@ -870,21 +869,12 @@ mark_message_delivered(Key, N) ->
end,
mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
-add_index() ->
- case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
- E -> E
- end.
+msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue).
-del_index() ->
- case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- %% hmm, something weird must be going on, but it's probably
- %% not the end of the world
- {aborted, {no_exists, rabbit_disk_queue,_}} -> ok;
- E1 -> E1
- end.
+msg_ref_gen('$end_of_table') -> finished;
+msg_ref_gen(Key) ->
+ [Obj] = mnesia:dirty_read(rabbit_disk_queue, Key),
+ {Obj #dq_msg_loc.msg_id, 1, mnesia:dirty_next(rabbit_disk_queue, Key)}.
prune_flush_batch(DeleteAcc, RemoveAcc, Store) ->
lists:foldl(fun (Key, ok) ->
@@ -1003,9 +993,3 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
0
end,
shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
-
-ref_count(MsgId) ->
- length(mnesia:dirty_index_match_object(
- rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId, _ = '_' },
- msg_id)).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d5959f9f38..b745acbf08 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,7 +31,7 @@
-module(rabbit_msg_store).
--export([init/6, write/4, read/2, attrs/2, remove/2, release/2,
+-export([init/7, write/4, read/2, attrs/2, remove/2, release/2,
needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1,
ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]).
@@ -104,9 +104,10 @@
ets_bytes_per_record :: non_neg_integer()
}).
--spec(init/6 :: ('ram_disk' | 'disk_only', file_path(),
+-spec(init/7 :: ('ram_disk' | 'disk_only', file_path(),
non_neg_integer(), non_neg_integer(),
- fun ((msg_id()) -> non_neg_integer()), non_neg_integer()) ->
+ (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})),
+ A, non_neg_integer()) ->
msstate()).
-spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()).
-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
@@ -241,8 +242,8 @@
%% from the store when it has been removed the same number of times.
%%
%% The reference counts do not persist. Therefore the initialisation
-%% function must be provided with a function that determines the
-%% initial reference count of any (recovered) message.
+%% function must be provided with a generator that produces ref count
+%% deltas for all recovered messages.
%%
%% Read messages with a reference count greater than one are entered
%% into a message cache. The purpose of the cache is not especially
@@ -256,8 +257,8 @@
%% public API
%%----------------------------------------------------------------------------
-init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
- EtsBytesPerRecord) ->
+init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit,
+ MsgRefDeltaGen, MsgRefDeltaGenInit, EtsBytesPerRecord) ->
file:delete(msg_location_dets_file(Dir)),
@@ -298,17 +299,18 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, RefCountFun,
message_cache = MessageCache,
ets_bytes_per_record = EtsBytesPerRecord
},
-
+
+ ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
FileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames),
+ ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames, State),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
Files = [filename_to_num(FileName) || FileName <- FileNames],
State1 = #msstate { current_file = CurFile, current_offset = Offset } =
- load_messages(RefCountFun, Files, State),
+ load_messages(Files, State),
%% read is only needed so that we can seek
{ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -666,26 +668,66 @@ dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
+dets_ets_select_delete(#msstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only }, MatchSpec) ->
+ dets:select_delete(MsgLocationDets, MatchSpec);
+dets_ets_select_delete(#msstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk }, MatchSpec) ->
+ ets:select_delete(MsgLocationEts, MatchSpec).
+
%%----------------------------------------------------------------------------
%% recovery
%%----------------------------------------------------------------------------
-recover_crashed_compactions(RefCountFun, Dir, FileNames, TmpFileNames) ->
+count_msg_refs(Gen, Seed, State) ->
+ case Gen(Seed) of
+ finished -> ok;
+ {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State);
+ {MsgId, Delta, Next} ->
+ case dets_ets_lookup(State, MsgId) of
+ [] -> true = dets_ets_insert_new(
+ State, #msg_location { msg_id = MsgId,
+ ref_count = Delta });
+ [StoreEntry = #msg_location { msg_id = MsgId,
+ ref_count = RefCount }] ->
+ NewRefCount = RefCount + Delta,
+ case NewRefCount of
+ 0 -> ok = dets_ets_delete(State, MsgId);
+ _ -> ok = dets_ets_insert(
+ State, StoreEntry #msg_location {
+ ref_count = NewRefCount })
+ end
+ end,
+ count_msg_refs(Gen, Next, State)
+ end.
+
+verify_messages_referenced(State, MsgIds) ->
+ lists:foreach(fun (MsgId) -> [_] = dets_ets_lookup(State, MsgId) end,
+ MsgIds).
+
+prune_stale_refs(State) ->
+ MatchHead = #msg_location { file = undefined, _ = '_' },
+ case dets_ets_select_delete(State, [{MatchHead, [], [true]}]) of
+ N when is_number(N) -> ok;
+ Other -> Other
+ end.
+
+recover_crashed_compactions(Dir, FileNames, TmpFileNames, State) ->
lists:foreach(fun (TmpFileName) ->
ok = recover_crashed_compactions1(
- RefCountFun, Dir, FileNames, TmpFileName)
+ Dir, FileNames, TmpFileName, State)
end,
TmpFileNames),
ok.
-recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) ->
+recover_crashed_compactions1(Dir, FileNames, TmpFileName, State) ->
NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFileName, FileNames),
{ok, UncorruptedMessagesTmp, MsgIdsTmp} =
scan_file_for_valid_messages_msg_ids(Dir, TmpFileName),
%% all of these messages should be referenced
%% otherwise they wouldn't have been copied out
- verify_messages_referenced(RefCountFun, MsgIdsTmp),
+ verify_messages_referenced(State, MsgIdsTmp),
{ok, UncorruptedMessages, MsgIds} =
scan_file_for_valid_messages_msg_ids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
@@ -740,7 +782,7 @@ recover_crashed_compactions1(RefCountFun, Dir, FileNames, TmpFileName) ->
length(Rest))}
end,
%% Check that everything in the main file prefix is referenced
- verify_messages_referenced(RefCountFun, MsgIds1),
+ verify_messages_referenced(State, MsgIds1),
%% The main file prefix should be contiguous
{Top, MsgIds1} = find_contiguous_block_prefix(
lists:reverse(UncorruptedMessages1)),
@@ -782,9 +824,6 @@ is_sublist(SmallerL, BiggerL) ->
is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-verify_messages_referenced(RefCountFun, MsgIds) ->
- lists:foreach(fun (MsgId) -> false = RefCountFun(MsgId) == 0 end, MsgIds).
-
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, FileName),
{ok, Messages,
@@ -819,13 +858,14 @@ find_contiguous_block_prefix([{MsgId, _Attrs, TotalSize, ExpectedOffset}
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
-load_messages(RefCountFun, [], State) ->
+load_messages([], State) ->
CurFile = State #msstate.current_file,
- load_messages(RefCountFun, undefined, [CurFile], State);
-load_messages(RefCountFun, Files, State) ->
- load_messages(RefCountFun, undefined, Files, State).
+ load_messages(undefined, [CurFile], State);
+load_messages(Files, State) ->
+ load_messages(undefined, Files, State).
-load_messages(_RefCountFun, Left, [], State) ->
+load_messages(Left, [], State) ->
+ ok = prune_stale_refs(State),
Offset =
case sort_msg_locations_by_offset(desc, Left, State) of
[] -> 0;
@@ -834,20 +874,19 @@ load_messages(_RefCountFun, Left, [], State) ->
MaxOffset + TotalSize
end,
State #msstate { current_file = Left, current_offset = Offset };
-load_messages(RefCountFun, Left, [File|Files],
+load_messages(Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary }) ->
{ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} = lists:foldl(
fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case RefCountFun(MsgId) of
- 0 -> {VMAcc, VTSAcc};
- RefCount ->
- true = dets_ets_insert_new(
- State, #msg_location {
- msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize,
- attrs = Attrs }),
+ case dets_ets_lookup(State, MsgId) of
+ [] -> {VMAcc, VTSAcc};
+ [StoreEntry] ->
+ ok = dets_ets_insert(
+ State, StoreEntry #msg_location {
+ file = File, offset = Offset,
+ total_size = TotalSize,
+ attrs = Attrs }),
{[Obj | VMAcc], VTSAcc + TotalSize}
end
end, {[], 0}, Messages),
@@ -862,7 +901,7 @@ load_messages(RefCountFun, Left, [File|Files],
file = File, valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
left = Left, right = Right }),
- load_messages(RefCountFun, File, Files, State).
+ load_messages(File, Files, State).
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation