summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-08 01:02:17 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-08 01:02:17 +0100
commit910debb20f4cc6ea4982c27495722bfeb4441a64 (patch)
tree6021d83f9ca459c11adb969b5349e526efbad268
parent62e114a90a73265b58e077e388814467ea19bf9d (diff)
downloadrabbitmq-server-git-910debb20f4cc6ea4982c27495722bfeb4441a64.tar.gz
Fixed the bug. The problem was that I was not removing msgs from the index for the dest in a gc if they had a refcount of 0. Thus they could later be reincremented. I was also thoughtlessly doing an index_delete_by_file on the src post gc, which should be unnecessary - 0 refcounts are now removed as they're discovered, and non-zero will be updated to be in the dest. This leaves only 1 worrying use of index_delete_by_file which is now in delete_file_if_empty. Here, scanning and deleting as we find msgs will be constant time and simple. Note this also features a very rough hacking out of the use of contiguous top in GC which turns out to be essential for this bug. However the real fix for that will be in bug 23233 which will block this bug.
-rw-r--r--src/rabbit_msg_store.erl70
1 files changed, 27 insertions, 43 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d0adfdcbd3..09c285670e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -619,7 +619,7 @@ handle_cast({write, Guid, Msg},
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
case index_lookup(Guid, State) of
not_found ->
- write_message(Guid, Msg, RefCount, State);
+ write_message(Guid, Msg, 1, State);
#msg_location { ref_count = 0, file = File, offset = Offset,
total_size = TotalSize } ->
[#file_summary { locked = Locked,
@@ -628,7 +628,7 @@ handle_cast({write, Guid, Msg},
case Locked of
true ->
ok = index_delete(Guid, State),
- write_message(Guid, Msg, RefCount, State);
+ write_message(Guid, Msg, 1, State);
false ->
ok = index_update_fields(
Guid, {#msg_location.ref_count, 1}, State),
@@ -703,7 +703,6 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
[{#file_summary.locked, false},
{#file_summary.right, SrcRight}]),
true = ets:delete(FileSummaryEts, Src),
- ok = index_delete_by_file(Src, State),
noreply(
maybe_compact(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
@@ -1573,7 +1572,6 @@ combine_files(#file_summary { file = Source,
left = Destination },
#file_summary { file = Destination,
valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
@@ -1589,42 +1587,25 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- case DestinationContiguousTop =:= DestinationValid of
- true ->
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
- false ->
- {DestinationWorkList, DestinationValid} =
- find_unremoved_messages_in_file(Destination, State),
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset =/= DestinationContiguousTop ->
- %% it cannot be that Offset =:=
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- end, DestinationWorkList),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and index_state has been updated to
- %% reflect the compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:delete(TmpHdl)
- end,
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ok = copy_messages(
+ DestinationWorkList, 0, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, 0, ExpectedSize),
+ {ok, DestinationValid} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, DestinationValid),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:delete(TmpHdl),
{SourceWorkList, SourceValid} =
find_unremoved_messages_in_file(Source, State),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
@@ -1643,9 +1624,12 @@ find_unremoved_messages_in_file(File,
lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
#msg_location { file = File, total_size = TotalSize,
- ref_count = RefCount,
- offset = Offset } = Entry
- when RefCount > 0 ->
+ ref_count = 0,
+ offset = Offset } ->
+ ok = Index:delete(Guid, IndexState),
+ Acc;
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
Acc