summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl47
-rw-r--r--src/rabbit_msg_store_gc.erl16
2 files changed, 42 insertions, 21 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 841f37072d..05fd1741a7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -38,7 +38,7 @@
write/4, read/3, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2,
- gc/3, has_readers/2]). %% internal
+ gc/3, delete_file/2, has_readers/2]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -159,6 +159,7 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
non_neg_integer()).
+-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-endif.
@@ -1483,7 +1484,8 @@ maybe_compact(State) ->
find_files_to_gc(FileSummaryEts, FileSizeLimit,
[#file_summary { file = Dst,
valid_total_size = DstValid,
- right = Src }]) ->
+ right = Src,
+ locked = DstLocked }]) ->
case Src of
undefined ->
not_found;
@@ -1491,11 +1493,14 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit,
[#file_summary { file = Src,
valid_total_size = SrcValid,
left = Dst,
- right = SrcRight }] = Next =
+ right = SrcRight,
+ locked = SrcLocked }] = Next =
ets:lookup(FileSummaryEts, Src),
case SrcRight of
undefined -> not_found;
- _ -> case DstValid + SrcValid =< FileSizeLimit of
+ _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso
+ (DstValid > 0) andalso (SrcValid > 0) andalso
+ not (DstLocked orelse SrcLocked) of
true -> {Src, Dst};
false -> find_files_to_gc(
FileSummaryEts, FileSizeLimit, Next)
@@ -1506,7 +1511,7 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit,
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, State = #msstate {
- dir = Dir,
+ gc_pid = GCPid,
sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts }) ->
@@ -1517,14 +1522,19 @@ delete_file_if_empty(File, State = #msstate {
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- %% we should NEVER find the current file in here hence right
- %% should always be a file, not undefined
- 0 -> case {Left, Right} of
+ 0 -> true = ets:update_element(FileSummaryEts, File,
+ {#file_summary.locked, true}),
+ %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ ok = rabbit_msg_store_gc:delete(GCPid, File),
+ %% we should NEVER find the current file in here hence
+ %% right should always be a file, not undefined
+ case {Left, Right} of
{undefined, _} when Right =/= undefined ->
%% the eldest file is empty.
- true = ets:update_element(
- FileSummaryEts, Right,
- {#file_summary.left, undefined});
+ true = ets:update_element(FileSummaryEts, Right,
+ {#file_summary.left, undefined});
{_, _} when Right =/= undefined ->
true = ets:update_element(FileSummaryEts, Right,
{#file_summary.left, Left}),
@@ -1532,13 +1542,7 @@ delete_file_if_empty(File, State = #msstate {
{#file_summary.right, Right})
end,
true = mark_handle_to_close(FileHandlesEts, File),
- true = ets:delete(FileSummaryEts, File),
- {ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- [index_delete(Guid, State) ||
- {Guid, _TotalSize, _Offset} <- Messages],
State1 = close_handle(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
State1 #msstate { sum_file_size = SumFileSize - FileSize };
_ -> State
end.
@@ -1552,6 +1556,15 @@ has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
ets:lookup(FileSummaryEts, File),
Count /= 0.
+delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
+ dir = Dir }) ->
+ [#file_summary { valid_total_size = 0,
+ locked = true,
+ readers = 0 }] = ets:lookup(FileSummaryEts, File),
+ {[], 0} = load_and_vacuum_message_file(File, State),
+ true = ets:delete(FileSummaryEts, File),
+ ok = file:delete(form_filename(Dir, filenum_to_name(File))).
+
gc(SrcFile, DstFile, State = #gc_state { file_summary_ets = FileSummaryEts }) ->
[SrcObj = #file_summary {
readers = 0,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 37b82cef2b..008de53508 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/1, gc/3, no_readers/2, stop/1]).
+-export([start_link/1, gc/3, delete/2, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
@@ -55,6 +55,7 @@
-spec(start_link/1 :: (rabbit_msg_store:gc_state()) ->
rabbit_types:ok_pid_or_error()).
-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
+-spec(delete/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(stop/1 :: (pid()) -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -68,7 +69,10 @@ start_link(MsgStoreState) ->
[{timeout, infinity}]).
gc(Server, Source, Destination) ->
- gen_server2:cast(Server, {gc, Source, Destination}).
+ gen_server2:cast(Server, {gc, [Source, Destination]}).
+
+delete(Server, File) ->
+ gen_server2:cast(Server, {delete, [File]}).
no_readers(Server, File) ->
gen_server2:cast(Server, {no_readers, File}).
@@ -95,8 +99,9 @@ prioritise_cast(_Msg, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination}, State) ->
- {noreply, attempt_action(gc, [Source, Destination], State), hibernate};
+handle_cast({Action, Files}, State)
+ when is_list(Files) andalso (Action =:= gc orelse Action =:= delete) ->
+ {noreply, attempt_action(Action, Files, State), hibernate};
handle_cast({no_readers, File},
State = #state { pending_no_readers = Pending }) ->
@@ -141,4 +146,7 @@ do_action(gc, [Source, Destination],
msg_store_state = MsgStoreState }) ->
Reclaimed = rabbit_msg_store:gc(Source, Destination, MsgStoreState),
ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination),
+ State;
+do_action(delete, [File], State = #state { msg_store_state = MsgStoreState }) ->
+ ok = rabbit_msg_store:delete_file(File, MsgStoreState),
State.