summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-17 18:42:12 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-17 18:42:12 +0000
commit78adc6d026300c7ebd29abf97e4f295e949e5636 (patch)
tree12fb30ead5cc0bc078118291b811e406dc33efd8 /src
parent719a8ff67308eb1b80a61ada6bfae7b7ef8da64b (diff)
downloadrabbitmq-server-git-78adc6d026300c7ebd29abf97e4f295e949e5636.tar.gz
All sorts of stuff. See the bug. GC is off here (other that deleting empty files). File locking is in. Some other machinery is in wrt background GC. Lots of reworking of the GC code to get it into a more useful state (everything from adjust_meta downwards). Tests do actually pass, but with GC otherwise off, you'd be mad to run this.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_msg_store.erl459
2 files changed, 271 insertions, 194 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 97c96fc771..9f74f60403 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -487,9 +487,9 @@ unfold(Fun, Acc, Init) ->
ceil(N) ->
T = trunc(N),
- case N - T of
- 0 -> N;
- _ -> 1 + T
+ case N == T of
+ true -> T;
+ false -> 1 + T
end.
%% Sorts a list of AMQP table fields as per the AMQP spec
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 0702cf3690..b8373fd131 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -83,20 +83,30 @@
sync_timer_ref, %% TRef for our interval timer
message_cache, %% ets message cache
sum_valid_data, %% sum of valid data in all files
- sum_file_size %% sum of file sizes
- }).
+ sum_file_size, %% sum of file sizes
+ pending_gc_completion, %% things to do once GC completes
+ gc_pid %% pid of the GC process
+ }).
-record(msg_location,
{msg_id, ref_count, file, offset, total_size}).
-record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size}).
+ {file, valid_total_size, contiguous_top, left, right, file_size,
+ locked}).
+
+-record(gcstate,
+ {dir
+ }).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+%% We run GC whenever the amount of garbage is >= GARBAGE_FRACTION *
+%% Total Valid Data
+-define(GARBAGE_FRACTION, 1.0).
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -248,11 +258,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
MsgLocations = ets:new(?MSG_LOC_NAME,
- [set, private, {keypos, #msg_location.msg_id}]),
+ [set, protected, {keypos, #msg_location.msg_id}]),
InitFile = 0,
FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME,
- [set, private, {keypos, #file_summary.file}]),
+ [ordered_set, protected,
+ {keypos, #file_summary.file}]),
MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]),
State =
#msstate { dir = Dir,
@@ -266,7 +277,9 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
sync_timer_ref = undefined,
message_cache = MessageCache,
sum_valid_data = 0,
- sum_file_size = 0
+ sum_file_size = 0,
+ pending_gc_completion = [],
+ gc_pid = undefined
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -289,52 +302,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{ok, State1 #msstate { current_file_handle = FileHdl }}.
-handle_call({read, MsgId}, _From, State =
- #msstate { current_file = CurFile,
- current_file_handle = CurHdl }) ->
- {Result, State1} =
- case index_lookup(MsgId, State) of
- not_found -> {not_found, State};
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- ok = case CurFile =:= File andalso {ok, Offset} >=
- file_handle_cache:current_raw_offset(CurHdl) of
- true -> file_handle_cache:flush(CurHdl);
- false -> ok
- end,
- {Hdl, State2} = get_read_handle(File, State),
- {ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
- case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- Rest ->
- throw({error, {misread, [{old_state, State},
- {file_num, File},
- {offset, Offset},
- {read, Rest},
- {proc_dict, get()}
- ]}})
- end,
- ok = case RefCount > 1 of
- true ->
- insert_into_cache(MsgId, Msg, State2);
- false ->
- %% it's not in the cache and we
- %% only have one reference to the
- %% message. So don't bother
- %% putting it in the cache.
- ok
- end,
- {{ok, Msg}, State2};
- {Msg, _RefCount} ->
- {{ok, Msg}, State}
- end
- end,
- reply(Result, State1);
+handle_call({read, MsgId}, From, State) ->
+ case read_message(MsgId, State) of
+ {ok, Msg, State1} -> reply({ok, Msg}, State1);
+ {blocked, State1} -> noreply(add_to_pending_gc_completion(
+ {read, MsgId, From}, State1))
+ end;
handle_call({contains, MsgId}, _From, State) ->
reply(case index_lookup(MsgId, State) of
@@ -360,6 +333,7 @@ handle_cast({write, MsgId, Msg},
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined,
+ locked = false,
file_size = FileSize }] =
ets:lookup(FileSummary, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
@@ -373,10 +347,11 @@ handle_cast({write, MsgId, Msg},
contiguous_top = ContiguousTop1,
file_size = FileSize + TotalSize }),
NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset,
- State #msstate { sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
+ noreply(maybe_compact(maybe_roll_to_new_file(
+ NextOffset, State #msstate
+ { sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }
+ )));
StoreEntry = #msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter
ok = index_update(StoreEntry #msg_location {
@@ -384,20 +359,11 @@ handle_cast({write, MsgId, Msg},
noreply(State)
end;
-handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
- {Files, State1} =
- lists:foldl(
- fun (MsgId, {Files1, State2}) ->
- case remove_message(MsgId, State2) of
- {compact, File, State3} ->
- {if CurFile =:= File -> Files1;
- true -> sets:add_element(File, Files1)
- end, State3};
- {no_compact, State3} ->
- {Files1, State3}
- end
- end, {sets:new(), State}, MsgIds),
- noreply(compact(sets:to_list(Files), State1));
+handle_cast({remove, MsgIds}, State) ->
+ State1 = lists:foldl(
+ fun (MsgId, State2) -> remove_message(MsgId, State2) end,
+ State, MsgIds),
+ noreply(maybe_compact(State1));
handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
@@ -421,11 +387,23 @@ handle_cast({sync, MsgIds, K},
handle_cast(sync, State) ->
noreply(sync(State)).
+%% handle_cast({gc_finished, GCPid, RemainingFile, DeletedFile, MsgLocations},
+%% State = #msstate { file_summary = FileSummary,
+%% gc_pid = GCPid }) ->
+%% true = ets:delete(FileSummary, DeletedFile),
+%% true = ets:insert(FileSummary, RemainingFile),
+%% State1 = lists:foldl(fun index_insert/2, State, MsgLocations),
+%% noreply(maybe_compact(run_pending(State1))).
+
handle_info(timeout, State) ->
noreply(sync(State));
handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_info({'EXIT', _Pid, normal}, State) ->
+ %% this is just the GC process going down
noreply(State).
terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
@@ -510,6 +488,63 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
+read_message(MsgId, State =
+ #msstate { current_file = CurFile,
+ current_file_handle = CurHdl,
+ file_summary = FileSummary }) ->
+ case index_lookup(MsgId, State) of
+ not_found -> {ok, not_found, State};
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ [#file_summary { locked = Locked }] =
+ ets:lookup(FileSummary, File),
+ case Locked of
+ true ->
+ {blocked, State};
+ false ->
+ ok = case CurFile =:= File andalso {ok, Offset} >=
+ file_handle_cache:current_raw_offset(
+ CurHdl) of
+ true -> file_handle_cache:flush(CurHdl);
+ false -> ok
+ end,
+ {Hdl, State1} = get_read_handle(File, State),
+ {ok, Offset} =
+ file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ Rest ->
+ throw({error, {misread,
+ [{old_state, State},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest},
+ {proc_dict, get()}
+ ]}})
+ end,
+ ok = case RefCount > 1 of
+ true ->
+ insert_into_cache(MsgId, Msg, State1);
+ false ->
+ %% it's not in the cache and
+ %% we only have one reference
+ %% to the message. So don't
+ %% bother putting it in the
+ %% cache.
+ ok
+ end,
+ {ok, Msg, State1}
+ end;
+ {Msg, _RefCount} ->
+ {ok, Msg, State}
+ end
+ end.
+
remove_message(MsgId, State = #msstate { file_summary = FileSummary,
sum_valid_data = SumValid }) ->
StoreEntry = #msg_location { ref_count = RefCount, file = File,
@@ -517,25 +552,50 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary,
index_lookup(MsgId, State),
case RefCount of
1 ->
- ok = index_delete(MsgId, State),
ok = remove_cache_entry(MsgId, State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop }] =
+ contiguous_top = ContiguousTop,
+ locked = Locked }] =
ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:insert(FileSummary, FSEntry #file_summary {
- valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
- {compact, File, State #msstate {
- sum_valid_data = SumValid - TotalSize }};
+ case Locked of
+ true ->
+ add_to_pending_gc_completion({remove, MsgId}, State);
+ false ->
+ ok = index_delete(MsgId, State),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ ValidTotalSize1 = ValidTotalSize - TotalSize,
+ true = ets:insert(
+ FileSummary, FSEntry #file_summary {
+ valid_total_size = ValidTotalSize1,
+ contiguous_top = ContiguousTop1 }),
+ State1 = delete_file_if_empty(File, State),
+ State1 #msstate { sum_valid_data = SumValid - TotalSize }
+ end;
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
- ok = index_update(StoreEntry #msg_location {
- ref_count = RefCount - 1 }, State),
- {no_compact, State}
+ ok = index_update(StoreEntry #msg_location
+ { ref_count = RefCount - 1 }, State),
+ State
end.
+add_to_pending_gc_completion(
+ Op, State = #msstate { pending_gc_completion = Pending }) ->
+ State #msstate { pending_gc_completion = [Op, Pending] }.
+
+run_pending(State = #msstate { pending_gc_completion = Pending }) ->
+ State1 = State #msstate { pending_gc_completion = [] },
+ lists:foldl(fun run_pending/2, State1, Pending).
+
+run_pending({read, MsgId, From}, State) ->
+ case read_message(MsgId, State) of
+ {ok, Msg, State1} -> gen_server2:reply(From, {ok, Msg}),
+ State1;
+ {blocked, State1} -> add_to_pending_gc_completion(
+ {read, MsgId, From}, State1)
+ end;
+run_pending({remove, MsgId}, State) ->
+ remove_message(MsgId, State).
+
close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
case dict:find(Key, FHC) of
{ok, Hdl} ->
@@ -791,20 +851,19 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
build_index([], State) ->
- build_index(undefined, [State #msstate.current_file], [], State);
+ build_index(undefined, [State #msstate.current_file], State);
build_index(Files, State) ->
- build_index(undefined, Files, [], State).
+ {Offset, State1} = build_index(undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}.
-build_index(Left, [], FilesToCompact, State =
- #msstate { file_summary = FileSummary }) ->
+build_index(Left, [], State = #msstate { file_summary = FileSummary }) ->
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummary, Left) of
[] -> 0;
[#file_summary { file_size = FileSize }] -> FileSize
end,
- {Offset, compact(FilesToCompact, %% this never includes the current file
- State #msstate { current_file = Left })};
-build_index(Left, [File|Files], FilesToCompact,
+ {Offset, State #msstate { current_file = Left }};
+build_index(Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary,
sum_valid_data = SumValid,
sum_file_size = SumFileSize }) ->
@@ -842,14 +901,9 @@ build_index(Left, [File|Files], FilesToCompact,
true =
ets:insert_new(FileSummary, #file_summary {
file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
+ contiguous_top = ContiguousTop, locked = false,
left = Left, right = Right, file_size = FileSize1 }),
- FilesToCompact1 =
- case FileSize1 == ContiguousTop orelse Right =:= undefined of
- true -> FilesToCompact;
- false -> [File | FilesToCompact]
- end,
- build_index(File, Files, FilesToCompact1,
+ build_index(File, Files,
State #msstate { sum_valid_data = SumValid + ValidTotalSize,
sum_file_size = SumFileSize + FileSize1 }).
@@ -873,13 +927,27 @@ maybe_roll_to_new_file(Offset,
true = ets:insert_new(
FileSummary, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
- left = CurFile, right = undefined, file_size = 0 }),
- State2 = State1 #msstate { current_file_handle = NextHdl,
- current_file = NextFile },
- compact([CurFile], State2);
+ left = CurFile, right = undefined, file_size = 0,
+ locked = false }),
+ State1 #msstate { current_file_handle = NextHdl,
+ current_file = NextFile };
maybe_roll_to_new_file(_, State) ->
State.
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ gc_pid = undefined,
+ file_summary = FileSummary })
+ when (SumFileSize - SumValid) > ?GARBAGE_FRACTION * SumValid ->
+ %% Pid = spawn_link(fun() ->
+ %% io:format("GC process!~n")
+ %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}),
+ %% end),
+ %% State #msstate { gc_pid = Pid };
+ State;
+maybe_compact(State) ->
+ State.
+
compact(Files, State) ->
%% smallest number, hence eldest, hence left-most, first
SortedFiles = lists:sort(Files),
@@ -932,30 +1000,25 @@ combine_file(File, State = #msstate { file_summary = FileSummary,
adjust_meta_and_combine(
LeftObj = #file_summary {
file = LeftFile, valid_total_size = LeftValidData, right = RightFile,
- file_size = LeftFileSize },
+ file_size = LeftFileSize, locked = true },
RightObj = #file_summary {
file = RightFile, valid_total_size = RightValidData, left = LeftFile,
- right = RightRight, file_size = RightFileSize },
- State = #msstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary,
- sum_file_size = SumFileSize }) ->
+ right = RightRight, file_size = RightFileSize, locked = true },
+ State) ->
TotalValidData = LeftValidData + RightValidData,
- if FileSizeLimit >= TotalValidData ->
- State1 = combine_files(RightObj, LeftObj, State),
- %% this could fail if RightRight is undefined
- ets:update_element(FileSummary, RightRight,
- {#file_summary.left, LeftFile}),
- true = ets:insert(FileSummary, LeftObj #file_summary {
- valid_total_size = TotalValidData,
- contiguous_top = TotalValidData,
- file_size = TotalValidData,
- right = RightRight }),
- true = ets:delete(FileSummary, RightFile),
- {true, State1 #msstate { sum_file_size =
- SumFileSize - LeftFileSize - RightFileSize
- + TotalValidData }};
- true -> {false, State}
- end.
+ {NewMsgLocs, State1} = combine_files(RightObj, LeftObj, State),
+ %% %% this could fail if RightRight is undefined
+ %% ets:update_element(FileSummary, RightRight,
+ %% {#file_summary.left, LeftFile}),
+ %% true = ets:delete(FileSummary, RightFile),
+ LeftObj1 = LeftObj #file_summary {
+ valid_total_size = TotalValidData,
+ contiguous_top = TotalValidData,
+ file_size = TotalValidData,
+ right = RightRight },
+ {RightFile, LeftObj1, NewMsgLocs,
+ TotalValidData - LeftFileSize - RightFileSize,
+ State1}.
combine_files(#file_summary { file = Source,
valid_total_size = SourceValid,
@@ -964,7 +1027,7 @@ combine_files(#file_summary { file = Source,
valid_total_size = DestinationValid,
contiguous_top = DestinationContiguousTop,
right = Source },
- State = #msstate { dir = Dir }) ->
+ State = #gcstate { dir = Dir }) ->
State1 = close_handle(Source, close_handle(Destination, State)),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -978,54 +1041,62 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- if DestinationContiguousTop =:= DestinationValid ->
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize);
- true ->
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset ==
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect
- %% that the list should be naturally sorted
- %% as we require, however, we need to
- %% enforce it anyway
- end, find_unremoved_messages_in_file(Destination, State1)),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State1),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocation has been updated to
- %% reflect compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:close(TmpHdl),
- ok = file:delete(form_filename(Dir, Tmp))
- end,
+ NewDestLocs =
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationValid, ExpectedSize),
+ [];
+ true ->
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if
+ %% it was then DestinationContiguousTop
+ %% would have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I
+ %% suspect that the list should be
+ %% naturally sorted as we require,
+ %% however, we need to enforce it anyway
+ end,
+ find_unremoved_messages_in_file(Destination, State1)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} =
+ open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ {ok, NewDestLocs1} =
+ copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage
+ %% from Destination, and NewDestLocs1 contains
+ %% msg_locations reflecting the compaction of
+ %% Destination so truncate Destination and copy from
+ %% Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:close(TmpHdl),
+ ok = file:delete(form_filename(Dir, Tmp)),
+ NewDestLocs1
+ end,
SourceWorkList = find_unremoved_messages_in_file(Source, State1),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination, State1),
+ {ok, NewSourceLocs} =
+ copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination),
%% tidy up
ok = file_handle_cache:close(SourceHdl),
ok = file_handle_cache:close(DestinationHdl),
ok = file:delete(form_filename(Dir, SourceName)),
- State1.
+ {[NewDestLocs, NewSourceLocs], State1}.
-find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
+find_unremoved_messages_in_file(File, State = #gcstate { dir = Dir }) ->
%% Msgs here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@@ -1039,37 +1110,41 @@ find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
end, [], Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, State) ->
- {FinalOffset, BlockStart1, BlockEnd1} =
+ Destination) ->
+ {FinalOffset, BlockStart1, BlockEnd1, NewMsgLocations} =
lists:foldl(
fun (StoreEntry = #msg_location { offset = Offset,
total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd}) ->
+ {CurOffset, BlockStart, BlockEnd, NewMsgLocs}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = index_update(StoreEntry #msg_location {
- file = Destination,
- offset = CurOffset }, State),
+ NewMsgLocs1 =
+ [StoreEntry #msg_location {
+ file = Destination,
+ offset = CurOffset } | NewMsgLocs],
NextOffset = CurOffset + TotalSize,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {NextOffset, Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the next
- %% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work for
- %% the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file_handle_cache:position(SourceHdl, BlockStart),
- {ok, BSize} = file_handle_cache:copy(
- SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + TotalSize}
- end
- end, {InitOffset, undefined, undefined}, WorkList),
+ {BlockStart2, BlockEnd2} =
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work
+ %% for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl,
+ BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
+ {Offset, Offset + TotalSize}
+ end,
+ {NextOffset, BlockStart2, BlockEnd2, NewMsgLocs1}
+ end, {InitOffset, undefined, undefined, []}, WorkList),
case WorkList of
[] ->
ok;
@@ -1082,8 +1157,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
ok = file_handle_cache:sync(DestinationHdl)
end,
- ok.
+ {ok, NewMsgLocations}.
+delete_file_if_empty(File, State = #msstate { current_file = File }) ->
+ State;
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
sum_file_size = SumFileSize } = State) ->
[#file_summary { valid_total_size = ValidData, file_size = FileSize,
@@ -1108,6 +1185,6 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
true = ets:delete(FileSummary, File),
State1 = close_handle(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }};
- _ -> {false, State}
+ State1 #msstate { sum_file_size = SumFileSize - FileSize };
+ _ -> State
end.