diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 54 |
2 files changed, 50 insertions, 41 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 66cc06cf94..169b4eeb2a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -100,10 +100,25 @@ -record(file_summary, {file, valid_total_size, left, right, file_size, locked, readers}). +-record(gc_state, + { dir, + index_module, + index_state, + file_summary_ets + }). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([gc_state/0]). + +-opaque(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid() + }). + -type(server() :: pid() | atom()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { @@ -141,8 +156,7 @@ -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {ets:tid(), file:filename(), atom(), any()}) -> +-spec(gc/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> 'concurrent_readers' | non_neg_integer()). -endif. @@ -570,8 +584,12 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, Offset} = file_handle_cache:position(CurHdl, Offset), ok = file_handle_cache:truncate(CurHdl), - {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts), + {ok, GCPid} = rabbit_msg_store_gc:start_link( + #gc_state { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + file_summary_ets = FileSummaryEts + }), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -1527,7 +1545,7 @@ delete_file_if_empty(File, State = #msstate { %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> +gc(SrcFile, DstFile, State = #gc_state { file_summary_ets = FileSummaryEts }) -> [SrcObj = #file_summary { readers = SrcReaders, left = DstFile, @@ -1557,7 +1575,7 @@ combine_files(#file_summary { file = Source, #file_summary { file = Destination, valid_total_size = DestinationValid, right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> + State = #gc_state { dir = Dir }) -> SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, @@ -1606,7 +1624,9 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(SourceHdl), ExpectedSize. -load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> +load_and_vacuum_message_file(File, #gc_state { dir = Dir, + index_module = Index, + index_state = IndexState}) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1627,7 +1647,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Destination, #gc_state { index_module = Index, + index_state = IndexState }) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index a7855bbf79..192140c313 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,20 +33,17 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, no_readers/2, stop/1]). +-export([start_link/1, gc/3, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). --record(gcstate, - {dir, - index_state, - index_module, - parent, - file_summary_ets, - scheduled +-record(state, + {parent, + scheduled, + msg_store_state }). -include("rabbit.hrl"). @@ -55,7 +52,7 @@ -ifdef(use_specs). --spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> +-spec(start_link/1 :: (rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error()). -spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). -spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,10 +63,9 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> - gen_server2:start_link( - ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], - [{timeout, infinity}]). +start_link(MsgStoreState) -> + gen_server2:start_link(?MODULE, [self(), MsgStoreState], + [{timeout, infinity}]). gc(Server, Source, Destination) -> gen_server2:cast(Server, {gc, Source, Destination}). @@ -85,16 +81,12 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> +init([Parent, MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #gcstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = undefined }, - hibernate, + {ok, #state { parent = Parent, + scheduled = undefined, + msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; @@ -104,12 +96,12 @@ handle_call(stop, _From, State) -> {stop, normal, ok, State}. handle_cast({gc, Source, Destination}, - State = #gcstate { scheduled = undefined }) -> - {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), + State = #state { scheduled = undefined }) -> + {noreply, attempt_gc(State #state { scheduled = {Source, Destination} }), hibernate}; handle_cast({no_readers, File}, - State = #gcstate { scheduled = {Source, Destination} }) + State = #state { scheduled = {Source, Destination} }) when File =:= Source orelse File =:= Destination -> {noreply, attempt_gc(State), hibernate}; @@ -129,16 +121,12 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -attempt_gc(State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = {Source, Destination} }) -> - case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}) of +attempt_gc(State = #state { parent = Parent, + scheduled = {Source, Destination}, + msg_store_state = MsgStoreState }) -> + case rabbit_msg_store:gc(Source, Destination, MsgStoreState) of concurrent_readers -> State; Reclaimed -> ok = rabbit_msg_store:gc_done( Parent, Reclaimed, Source, Destination), - State #gcstate { scheduled = undefined } + State #state { scheduled = undefined } end. |
