summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl8
-rw-r--r--src/rabbit_msg_store_gc.erl58
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 3c82caab6e..841f37072d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -38,7 +38,7 @@
write/4, read/3, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2,
- gc/3, has_no_readers/2]). %% internal
+ gc/3, has_readers/2]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -159,7 +159,7 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
non_neg_integer()).
--spec(has_no_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
+-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-endif.
@@ -1547,10 +1547,10 @@ delete_file_if_empty(File, State = #msstate {
%% garbage collection / compaction / aggregation -- external
%%----------------------------------------------------------------------------
-has_no_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
+has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
[#file_summary { locked = true, readers = Count }] =
ets:lookup(FileSummaryEts, File),
- Count == 0.
+ Count /= 0.
gc(SrcFile, DstFile, State = #gc_state { file_summary_ets = FileSummaryEts }) ->
[SrcObj = #file_summary {
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 6e0461bb6c..3b98e1df8e 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -41,9 +41,9 @@
terminate/2, code_change/3, prioritise_cast/2]).
-record(state,
- {parent,
- scheduled,
- msg_store_state
+ { parent,
+ pending_no_readers,
+ msg_store_state
}).
-include("rabbit.hrl").
@@ -84,9 +84,9 @@ set_maximum_since_use(Pid, Age) ->
init([Parent, MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- {ok, #state { parent = Parent,
- scheduled = undefined,
- msg_store_state = MsgStoreState }, hibernate,
+ {ok, #state { parent = Parent,
+ pending_no_readers = dict:new(),
+ msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
@@ -95,18 +95,21 @@ prioritise_cast(_Msg, _State) -> 0.
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination},
- State = #state { scheduled = undefined }) ->
- {noreply, attempt_gc(State #state { scheduled = {Source, Destination} }),
- hibernate};
+handle_cast({gc, Source, Destination}, State) ->
+ {noreply, attempt_gc(Source, Destination, State), hibernate};
handle_cast({no_readers, File},
- State = #state { scheduled = {Source, Destination} })
- when File =:= Source orelse File =:= Destination ->
- {noreply, attempt_gc(State), hibernate};
-
-handle_cast({no_readers, _File}, State) ->
- {noreply, State, hibernate};
+ State = #state { pending_no_readers = Pending }) ->
+ State1 = case dict:find(File, Pending) of
+ error ->
+ State;
+ {ok, {Source, Destination}} ->
+ attempt_gc(
+ Source, Destination,
+ State #state { pending_no_readers =
+ dict:erase(File, Pending) })
+ end,
+ {noreply, State1, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -121,17 +124,20 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-attempt_gc(State = #state { parent = Parent,
- scheduled = {Source, Destination},
- msg_store_state = MsgStoreState }) ->
- case lists:all(fun (File) ->
- rabbit_msg_store:has_no_readers(File, MsgStoreState)
- end, [Source, Destination]) of
- true ->
+attempt_gc(Source, Destination,
+ State = #state { parent = Parent,
+ pending_no_readers = Pending,
+ msg_store_state = MsgStoreState }) ->
+ case lists:filter(fun (File) ->
+ rabbit_msg_store:has_readers(File, MsgStoreState)
+ end, [Source, Destination]) of
+ [] ->
Reclaimed = rabbit_msg_store:gc(Source, Destination, MsgStoreState),
ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source,
Destination),
- State #state { scheduled = undefined };
- false ->
- State
+ State;
+ [File | _] ->
+ State #state { pending_no_readers =
+ dict:store(File, {Source, Destination}, Pending)
+ }
end.