summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-10-18 16:51:25 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-10-18 16:51:25 +0100
commitfa88202e420ffbe685ddc113a2784ee42329dc57 (patch)
treec83c1d82038ce17d5007bad0ec548d2c7cd78bdf
parent1d554294023459a2b80fe8cac5ba69b90be35e0c (diff)
downloadrabbitmq-server-git-fa88202e420ffbe685ddc113a2784ee42329dc57.tar.gz
Properly abstract the state that the msg_store passes to the gc.
-rw-r--r--src/rabbit_msg_store.erl37
-rw-r--r--src/rabbit_msg_store_gc.erl54
2 files changed, 50 insertions, 41 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 66cc06cf94..169b4eeb2a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -100,10 +100,25 @@
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
+-record(gc_state,
+ { dir,
+ index_module,
+ index_state,
+ file_summary_ets
+ }).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([gc_state/0]).
+
+-opaque(gc_state() :: #gc_state { dir :: file:filename(),
+ index_module :: atom(),
+ index_state :: any(),
+ file_summary_ets :: ets:tid()
+ }).
+
-type(server() :: pid() | atom()).
-type(file_num() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate {
@@ -141,8 +156,7 @@
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
- {ets:tid(), file:filename(), atom(), any()}) ->
+-spec(gc/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
'concurrent_readers' | non_neg_integer()).
-endif.
@@ -570,8 +584,12 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{ok, Offset} = file_handle_cache:position(CurHdl, Offset),
ok = file_handle_cache:truncate(CurHdl),
- {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule,
- FileSummaryEts),
+ {ok, GCPid} = rabbit_msg_store_gc:start_link(
+ #gc_state { dir = Dir,
+ index_module = IndexModule,
+ index_state = IndexState,
+ file_summary_ets = FileSummaryEts
+ }),
{ok, maybe_compact(
State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }),
@@ -1527,7 +1545,7 @@ delete_file_if_empty(File, State = #msstate {
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
+gc(SrcFile, DstFile, State = #gc_state { file_summary_ets = FileSummaryEts }) ->
[SrcObj = #file_summary {
readers = SrcReaders,
left = DstFile,
@@ -1557,7 +1575,7 @@ combine_files(#file_summary { file = Source,
#file_summary { file = Destination,
valid_total_size = DestinationValid,
right = Source },
- State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
+ State = #gc_state { dir = Dir }) ->
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
{ok, SourceHdl} = open_file(Dir, SourceName,
@@ -1606,7 +1624,9 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:delete(SourceHdl),
ExpectedSize.
-load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
+load_and_vacuum_message_file(File, #gc_state { dir = Dir,
+ index_module = Index,
+ index_state = IndexState}) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
@@ -1627,7 +1647,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) ->
end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
+ Destination, #gc_state { index_module = Index,
+ index_state = IndexState }) ->
Copy = fun ({BlockStart, BlockEnd}) ->
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index a7855bbf79..192140c313 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -33,20 +33,17 @@
-behaviour(gen_server2).
--export([start_link/4, gc/3, no_readers/2, stop/1]).
+-export([start_link/1, gc/3, no_readers/2, stop/1]).
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/2]).
--record(gcstate,
- {dir,
- index_state,
- index_module,
- parent,
- file_summary_ets,
- scheduled
+-record(state,
+ {parent,
+ scheduled,
+ msg_store_state
}).
-include("rabbit.hrl").
@@ -55,7 +52,7 @@
-ifdef(use_specs).
--spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) ->
+-spec(start_link/1 :: (rabbit_msg_store:gc_state()) ->
rabbit_types:ok_pid_or_error()).
-spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok').
-spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -66,10 +63,9 @@
%%----------------------------------------------------------------------------
-start_link(Dir, IndexState, IndexModule, FileSummaryEts) ->
- gen_server2:start_link(
- ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts],
- [{timeout, infinity}]).
+start_link(MsgStoreState) ->
+ gen_server2:start_link(?MODULE, [self(), MsgStoreState],
+ [{timeout, infinity}]).
gc(Server, Source, Destination) ->
gen_server2:cast(Server, {gc, Source, Destination}).
@@ -85,16 +81,12 @@ set_maximum_since_use(Pid, Age) ->
%%----------------------------------------------------------------------------
-init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
+init([Parent, MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- parent = Parent,
- file_summary_ets = FileSummaryEts,
- scheduled = undefined },
- hibernate,
+ {ok, #state { parent = Parent,
+ scheduled = undefined,
+ msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
@@ -104,12 +96,12 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({gc, Source, Destination},
- State = #gcstate { scheduled = undefined }) ->
- {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }),
+ State = #state { scheduled = undefined }) ->
+ {noreply, attempt_gc(State #state { scheduled = {Source, Destination} }),
hibernate};
handle_cast({no_readers, File},
- State = #gcstate { scheduled = {Source, Destination} })
+ State = #state { scheduled = {Source, Destination} })
when File =:= Source orelse File =:= Destination ->
{noreply, attempt_gc(State), hibernate};
@@ -129,16 +121,12 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-attempt_gc(State = #gcstate { dir = Dir,
- index_state = IndexState,
- index_module = Index,
- parent = Parent,
- file_summary_ets = FileSummaryEts,
- scheduled = {Source, Destination} }) ->
- case rabbit_msg_store:gc(Source, Destination,
- {FileSummaryEts, Dir, Index, IndexState}) of
+attempt_gc(State = #state { parent = Parent,
+ scheduled = {Source, Destination},
+ msg_store_state = MsgStoreState }) ->
+ case rabbit_msg_store:gc(Source, Destination, MsgStoreState) of
concurrent_readers -> State;
Reclaimed -> ok = rabbit_msg_store:gc_done(
Parent, Reclaimed, Source, Destination),
- State #gcstate { scheduled = undefined }
+ State #state { scheduled = undefined }
end.