diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 58 |
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 3c82caab6e..841f37072d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -38,7 +38,7 @@ write/4, read/3, contains/2, remove/2, release/2, sync/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, - gc/3, has_no_readers/2]). %% internal + gc/3, has_readers/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -159,7 +159,7 @@ -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> non_neg_integer()). --spec(has_no_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). +-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -endif. @@ -1547,10 +1547,10 @@ delete_file_if_empty(File, State = #msstate { %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -has_no_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> +has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> [#file_summary { locked = true, readers = Count }] = ets:lookup(FileSummaryEts, File), - Count == 0. + Count /= 0. gc(SrcFile, DstFile, State = #gc_state { file_summary_ets = FileSummaryEts }) -> [SrcObj = #file_summary { diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 6e0461bb6c..3b98e1df8e 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -41,9 +41,9 @@ terminate/2, code_change/3, prioritise_cast/2]). -record(state, - {parent, - scheduled, - msg_store_state + { parent, + pending_no_readers, + msg_store_state }). -include("rabbit.hrl"). @@ -84,9 +84,9 @@ set_maximum_since_use(Pid, Age) -> init([Parent, MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #state { parent = Parent, - scheduled = undefined, - msg_store_state = MsgStoreState }, hibernate, + {ok, #state { parent = Parent, + pending_no_readers = dict:new(), + msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; @@ -95,18 +95,21 @@ prioritise_cast(_Msg, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, - State = #state { scheduled = undefined }) -> - {noreply, attempt_gc(State #state { scheduled = {Source, Destination} }), - hibernate}; +handle_cast({gc, Source, Destination}, State) -> + {noreply, attempt_gc(Source, Destination, State), hibernate}; handle_cast({no_readers, File}, - State = #state { scheduled = {Source, Destination} }) - when File =:= Source orelse File =:= Destination -> - {noreply, attempt_gc(State), hibernate}; - -handle_cast({no_readers, _File}, State) -> - {noreply, State, hibernate}; + State = #state { pending_no_readers = Pending }) -> + State1 = case dict:find(File, Pending) of + error -> + State; + {ok, {Source, Destination}} -> + attempt_gc( + Source, Destination, + State #state { pending_no_readers = + dict:erase(File, Pending) }) + end, + {noreply, State1, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -121,17 +124,20 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -attempt_gc(State = #state { parent = Parent, - scheduled = {Source, Destination}, - msg_store_state = MsgStoreState }) -> - case lists:all(fun (File) -> - rabbit_msg_store:has_no_readers(File, MsgStoreState) - end, [Source, Destination]) of - true -> +attempt_gc(Source, Destination, + State = #state { parent = Parent, + pending_no_readers = Pending, + msg_store_state = MsgStoreState }) -> + case lists:filter(fun (File) -> + rabbit_msg_store:has_readers(File, MsgStoreState) + end, [Source, Destination]) of + [] -> Reclaimed = rabbit_msg_store:gc(Source, Destination, MsgStoreState), ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), - State #state { scheduled = undefined }; - false -> - State + State; + [File | _] -> + State #state { pending_no_readers = + dict:store(File, {Source, Destination}, Pending) + } end. |
