diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 16 |
2 files changed, 42 insertions, 21 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 841f37072d..05fd1741a7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -38,7 +38,7 @@ write/4, read/3, contains/2, remove/2, release/2, sync/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, - gc/3, has_readers/2]). %% internal + gc/3, delete_file/2, has_readers/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -159,6 +159,7 @@ -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> non_neg_integer()). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -endif. @@ -1483,7 +1484,8 @@ maybe_compact(State) -> find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Dst, valid_total_size = DstValid, - right = Src }]) -> + right = Src, + locked = DstLocked }]) -> case Src of undefined -> not_found; @@ -1491,11 +1493,14 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Src, valid_total_size = SrcValid, left = Dst, - right = SrcRight }] = Next = + right = SrcRight, + locked = SrcLocked }] = Next = ets:lookup(FileSummaryEts, Src), case SrcRight of undefined -> not_found; - _ -> case DstValid + SrcValid =< FileSizeLimit of + _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso + (DstValid > 0) andalso (SrcValid > 0) andalso + not (DstLocked orelse SrcLocked) of true -> {Src, Dst}; false -> find_files_to_gc( FileSummaryEts, FileSizeLimit, Next) @@ -1506,7 +1511,7 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = #msstate { - dir = Dir, + gc_pid = GCPid, sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> @@ -1517,14 +1522,19 @@ delete_file_if_empty(File, State = #msstate { locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined - 0 -> case {Left, Right} of + 0 -> true = ets:update_element(FileSummaryEts, File, + {#file_summary.locked, true}), + %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + ok = rabbit_msg_store_gc:delete(GCPid, File), + %% we should NEVER find the current file in here hence + %% right should always be a file, not undefined + case {Left, Right} of {undefined, _} when Right =/= undefined -> %% the eldest file is empty. - true = ets:update_element( - FileSummaryEts, Right, - {#file_summary.left, undefined}); + true = ets:update_element(FileSummaryEts, Right, + {#file_summary.left, undefined}); {_, _} when Right =/= undefined -> true = ets:update_element(FileSummaryEts, Right, {#file_summary.left, Left}), @@ -1532,13 +1542,7 @@ delete_file_if_empty(File, State = #msstate { {#file_summary.right, Right}) end, true = mark_handle_to_close(FileHandlesEts, File), - true = ets:delete(FileSummaryEts, File), - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - [index_delete(Guid, State) || - {Guid, _TotalSize, _Offset} <- Messages], State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; _ -> State end. @@ -1552,6 +1556,15 @@ has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> ets:lookup(FileSummaryEts, File), Count /= 0. +delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir }) -> + [#file_summary { valid_total_size = 0, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + {[], 0} = load_and_vacuum_message_file(File, State), + true = ets:delete(FileSummaryEts, File), + ok = file:delete(form_filename(Dir, filenum_to_name(File))). + gc(SrcFile, DstFile, State = #gc_state { file_summary_ets = FileSummaryEts }) -> [SrcObj = #file_summary { readers = 0, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 37b82cef2b..008de53508 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/1, gc/3, no_readers/2, stop/1]). +-export([start_link/1, gc/3, delete/2, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). @@ -55,6 +55,7 @@ -spec(start_link/1 :: (rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error()). -spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). +-spec(delete/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(stop/1 :: (pid()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -68,7 +69,10 @@ start_link(MsgStoreState) -> [{timeout, infinity}]). gc(Server, Source, Destination) -> - gen_server2:cast(Server, {gc, Source, Destination}). + gen_server2:cast(Server, {gc, [Source, Destination]}). + +delete(Server, File) -> + gen_server2:cast(Server, {delete, [File]}). no_readers(Server, File) -> gen_server2:cast(Server, {no_readers, File}). @@ -95,8 +99,9 @@ prioritise_cast(_Msg, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, State) -> - {noreply, attempt_action(gc, [Source, Destination], State), hibernate}; +handle_cast({Action, Files}, State) + when is_list(Files) andalso (Action =:= gc orelse Action =:= delete) -> + {noreply, attempt_action(Action, Files, State), hibernate}; handle_cast({no_readers, File}, State = #state { pending_no_readers = Pending }) -> @@ -141,4 +146,7 @@ do_action(gc, [Source, Destination], msg_store_state = MsgStoreState }) -> Reclaimed = rabbit_msg_store:gc(Source, Destination, MsgStoreState), ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), + State; +do_action(delete, [File], State = #state { msg_store_state = MsgStoreState }) -> + ok = rabbit_msg_store:delete_file(File, MsgStoreState), State. |
