summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl70
1 files changed, 27 insertions, 43 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d0adfdcbd3..09c285670e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -619,7 +619,7 @@ handle_cast({write, Guid, Msg},
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
case index_lookup(Guid, State) of
not_found ->
- write_message(Guid, Msg, RefCount, State);
+ write_message(Guid, Msg, 1, State);
#msg_location { ref_count = 0, file = File, offset = Offset,
total_size = TotalSize } ->
[#file_summary { locked = Locked,
@@ -628,7 +628,7 @@ handle_cast({write, Guid, Msg},
case Locked of
true ->
ok = index_delete(Guid, State),
- write_message(Guid, Msg, RefCount, State);
+ write_message(Guid, Msg, 1, State);
false ->
ok = index_update_fields(
Guid, {#msg_location.ref_count, 1}, State),
@@ -703,7 +703,6 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
[{#file_summary.locked, false},
{#file_summary.right, SrcRight}]),
true = ets:delete(FileSummaryEts, Src),
- ok = index_delete_by_file(Src, State),
noreply(
maybe_compact(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
@@ -1573,7 +1572,6 @@ combine_files(#file_summary { file = Source,
left = Destination },
#file_summary { file = Destination,
valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
@@ -1589,42 +1587,25 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- case DestinationContiguousTop =:= DestinationValid of
- true ->
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
- false ->
- {DestinationWorkList, DestinationValid} =
- find_unremoved_messages_in_file(Destination, State),
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset =/= DestinationContiguousTop ->
- %% it cannot be that Offset =:=
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- end, DestinationWorkList),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and index_state has been updated to
- %% reflect the compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:delete(TmpHdl)
- end,
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ok = copy_messages(
+ DestinationWorkList, 0, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, 0, ExpectedSize),
+ {ok, DestinationValid} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, DestinationValid),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:delete(TmpHdl),
{SourceWorkList, SourceValid} =
find_unremoved_messages_in_file(Source, State),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
@@ -1643,9 +1624,12 @@ find_unremoved_messages_in_file(File,
lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
#msg_location { file = File, total_size = TotalSize,
- ref_count = RefCount,
- offset = Offset } = Entry
- when RefCount > 0 ->
+ ref_count = 0,
+ offset = Offset } ->
+ ok = Index:delete(Guid, IndexState),
+ Acc;
+ #msg_location { file = File, total_size = TotalSize,
+ offset = Offset } = Entry ->
{[ Entry | List ], TotalSize + Size};
_ ->
Acc