summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-14 00:37:29 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-14 00:37:29 +0000
commit5632d3944614e2134567386de8253e17a4a91ef6 (patch)
treeeb60b8d10d936636edd48693bd6233ab76e35391 /src
parent2d0409667e82d86c55a18acfe2fb3fb9de56260e (diff)
downloadrabbitmq-server-git-5632d3944614e2134567386de8253e17a4a91ef6.tar.gz
More testing needed, but this does seem to work
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl126
-rw-r--r--src/rabbit_msg_store_gc.erl22
-rw-r--r--src/rabbit_tests.erl20
-rw-r--r--src/rabbit_variable_queue.erl31
4 files changed, 136 insertions, 63 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f8b41ed388..e3f7cc8f53 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,8 +34,8 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/3, client_terminate/1, client_delete_and_terminate/1,
- client_ref/1,
+ client_init/4, client_terminate/1, client_delete_and_terminate/1,
+ client_ref/1, close_all_indicated/1,
write/3, read/2, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
@@ -82,7 +82,8 @@
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
- client_refs, %% set of references of all registered clients
+ client_refs, %% map of references of all registered clients
+ %% mapping to close_fds_fun callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
client_ondisk_callback, %% client ref to callback function mapping
@@ -111,6 +112,7 @@
index_module,
index_state,
file_summary_ets,
+ file_handles_ets,
msg_store
}).
@@ -124,6 +126,7 @@
index_module :: atom(),
index_state :: any(),
file_summary_ets :: ets:tid(),
+ file_handles_ets :: ets:tid(),
msg_store :: server()
}).
@@ -146,13 +149,15 @@
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
+-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
+-type(deletion_thunk() :: fun (() -> 'ok' | deletion_thunk())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) ->
- client_msstate()).
+-spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(),
+ maybe_close_fds_fun()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
@@ -169,8 +174,9 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()).
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
- 'ok').
--spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok').
+ 'ok' | deletion_thunk()).
+-spec(delete_file/2 :: (non_neg_integer(), gc_state()) ->
+ 'ok' | deletion_thunk()).
-endif.
@@ -399,11 +405,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref, MsgOnDiskFun) ->
+client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun},
- infinity),
+ gen_server2:call(
+ Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -523,7 +529,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
- gc_pid = GCPid }) ->
+ gc_pid = GCPid,
+ client_ref = Ref }) ->
Release =
fun() -> ok = case ets:update_counter(FileSummaryEts, File,
{#file_summary.readers, -1}) of
@@ -570,7 +577,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
case index_lookup(Guid, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
- mark_handle_open(FileHandlesEts, File),
+ mark_handle_open(FileHandlesEts, File, Ref),
CState1 = close_all_indicated(CState),
{Msg, CState2} = %% This will never be the current file
@@ -684,6 +691,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
index_module = IndexModule,
index_state = IndexState,
file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
msg_store = self()
}),
@@ -694,10 +702,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC} -> 7;
- {read, _Guid} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -713,7 +721,7 @@ prioritise_cast(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, Callback}, _From,
+handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -724,13 +732,14 @@ handle_call({new_client_state, CRef, Callback}, _From,
client_refs = ClientRefs,
client_ondisk_callback = CODC,
gc_pid = GCPid }) ->
- CODC1 = case Callback of
+ CODC1 = case MsgOnDiskFun of
undefined -> CODC;
- _ -> dict:store(CRef, Callback, CODC)
+ _ -> dict:store(CRef, MsgOnDiskFun, CODC)
end,
+ ClientRefs1 = dict:store(CRef, CloseFDsFun, ClientRefs),
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = sets:add_element(CRef, ClientRefs),
+ State #msstate { client_refs = ClientRefs1,
client_ondisk_callback = CODC1 });
handle_call({client_terminate, CRef}, _From,
@@ -755,7 +764,7 @@ handle_cast({client_delete, CRef},
dying_clients = DyingClients }) ->
State1 = clear_client_callback(
CRef, State #msstate {
- client_refs = sets:del_element(CRef, ClientRefs),
+ client_refs = dict:erase(CRef, ClientRefs),
dying_clients = sets:del_element(CRef, DyingClients) }),
noreply(remove_message(CRef, CRef, State1));
@@ -832,10 +841,11 @@ handle_cast(sync, State) ->
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
+ file_summary_ets = FileSummaryEts,
+ client_refs = ClientRefs }) ->
ok = cleanup_after_file_deletion(Source, State),
%% see comment in cleanup_after_file_deletion
- true = mark_handle_to_close(FileHandlesEts, Destination),
+ true = mark_handle_to_close(ClientRefs, FileHandlesEts, Destination, false),
true = ets:update_element(FileSummaryEts, Destination,
{#file_summary.locked, false}),
State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
@@ -881,7 +891,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
[ets:delete(T) ||
T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, sets:to_list(ClientRefs)},
+ store_recovery_terms([{client_refs, dict:fetch_keys(ClientRefs)},
{index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -1221,29 +1231,48 @@ close_handle(Key, FHC) ->
error -> FHC
end.
-mark_handle_open(FileHandlesEts, File) ->
+mark_handle_open(FileHandlesEts, File, Ref) ->
%% This is fine to fail (already exists)
- ets:insert_new(FileHandlesEts, {{self(), File}, open}),
+ ets:insert_new(FileHandlesEts, {{Ref, File}, open}),
true.
-mark_handle_to_close(FileHandlesEts, File) ->
- [ ets:update_element(FileHandlesEts, Key, {2, close})
- || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
+mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
+ [ begin
+ ets:update_element(FileHandlesEts, Key, {2, close}),
+ case Invoke of
+ true -> case dict:fetch(Ref, ClientRefs) of
+ undefined -> ok;
+ CloseFDsFun -> ok = CloseFDsFun()
+ end;
+ false -> ok
+ end
+ end ||
+ {{Ref, _File} = Key, open} <-
+ ets:match_object(FileHandlesEts, {{'_', File}, open}) ],
true.
-close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
+safe_file_delete(FileHandlesEts, File, Dir) ->
+ case ets:match_object(FileHandlesEts, {{'_', File}, open}, 1) of
+ {[_|_], _Cont} ->
+ fun () -> safe_file_delete(FileHandlesEts, File, Dir) end;
+ _ ->
+ ok = file:delete(form_filename(Dir, filenum_to_name(File)))
+ end.
+
+close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
+ client_ref = Ref } =
CState) ->
- Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
- lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
+ Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
+ lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
close_handle(File, CStateM)
end, CState, Objs).
-close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
- file_handle_cache = FHC }) ->
- Self = self(),
+close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts,
+ file_handle_cache = FHC,
+ client_ref = Ref }) ->
ok = dict:fold(fun (File, Hdl, ok) ->
- true = ets:delete(FileHandlesEts, {Self, File}),
+ true = ets:delete(FileHandlesEts, {Ref, File}),
file_handle_cache:close(Hdl)
end, ok, FHC),
CState #client_msstate { file_handle_cache = dict:new() };
@@ -1381,16 +1410,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
%%----------------------------------------------------------------------------
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
- {false, IndexModule:new(Dir), sets:new()};
+ {false, IndexModule:new(Dir), dict:new()};
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
- {false, IndexModule:new(Dir), sets:new()};
+ {false, IndexModule:new(Dir), dict:new()};
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch~n",
[Server | ErrorArgs]),
- {false, IndexModule:new(Dir), sets:new()}
+ {false, IndexModule:new(Dir), dict:new()}
end,
case read_recovery_terms(Dir) of
{false, Error} ->
@@ -1403,7 +1432,8 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
{true, IndexState1,
- sets:from_list(ClientRefs)};
+ dict:from_list(
+ [{CRef, undefined} || CRef <- ClientRefs])};
{error, Error} ->
Fresh("failed to recover index: ~p", [Error])
end;
@@ -1744,7 +1774,8 @@ delete_file_if_empty(File, State = #msstate {
cleanup_after_file_deletion(File,
#msstate { file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
+ file_summary_ets = FileSummaryEts,
+ client_refs = ClientRefs }) ->
%% Ensure that any clients that have open fhs to the file close
%% them before using them again. This has to be done here (given
%% it's done in the msg_store, and not the gc), and not when
@@ -1752,7 +1783,7 @@ cleanup_after_file_deletion(File,
%% the client could find the close, and close and reopen the fh,
%% whilst the GC is waiting for readers to disappear, before it's
%% actually done the GC.
- true = mark_handle_to_close(FileHandlesEts, File),
+ true = mark_handle_to_close(ClientRefs, FileHandlesEts, File, true),
[#file_summary { left = Left,
right = Right,
locked = true,
@@ -1781,6 +1812,7 @@ has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) ->
combine_files(Source, Destination,
State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
@@ -1841,7 +1873,7 @@ combine_files(Source, Destination,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
ok = file_handle_cache:close(DestinationHdl),
- ok = file_handle_cache:delete(SourceHdl),
+ ok = file_handle_cache:close(SourceHdl),
%% don't update dest.right, because it could be changing at the
%% same time
@@ -1851,9 +1883,11 @@ combine_files(Source, Destination,
{#file_summary.file_size, TotalValidData}]),
Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData,
- gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}).
+ gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}),
+ safe_file_delete(FileHandlesEts, Source, Dir).
delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
+ file_handles_ets = FileHandlesEts,
dir = Dir,
msg_store = Server }) ->
[#file_summary { valid_total_size = 0,
@@ -1861,8 +1895,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
file_size = FileSize,
readers = 0 }] = ets:lookup(FileSummaryEts, File),
{[], 0} = load_and_vacuum_message_file(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- gen_server2:cast(Server, {delete_file, File, FileSize}).
+ gen_server2:cast(Server, {delete_file, File, FileSize}),
+ safe_file_delete(FileHandlesEts, File, Dir).
load_and_vacuum_message_file(File, #gc_state { dir = Dir,
index_module = Index,
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index cd9fd4973f..0baf992b9d 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -42,6 +42,7 @@
-record(state,
{ pending_no_readers,
+ on_action,
msg_store_state
}).
@@ -89,6 +90,7 @@ init([MsgStoreState]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
{ok, #state { pending_no_readers = dict:new(),
+ on_action = [],
msg_store_state = MsgStoreState }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -131,16 +133,30 @@ code_change(_OldVsn, State, _Extra) ->
attempt_action(Action, Files,
State = #state { pending_no_readers = Pending,
+ on_action = Thunks,
msg_store_state = MsgStoreState }) ->
+ Thunks1 = run_thunks(Thunks),
case [File || File <- Files,
rabbit_msg_store:has_readers(File, MsgStoreState)] of
- [] -> do_action(Action, Files, MsgStoreState),
- State;
+ [] -> Thunks2 = case do_action(Action, Files, MsgStoreState) of
+ ok -> Thunks1;
+ Thunk -> [Thunk | Thunks1]
+ end,
+ State #state { on_action = Thunks2 };
[File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending),
- State #state { pending_no_readers = Pending1 }
+ State #state { pending_no_readers = Pending1,
+ on_action = Thunks1 }
end.
do_action(combine, [Source, Destination], MsgStoreState) ->
rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
do_action(delete, [File], MsgStoreState) ->
rabbit_msg_store:delete_file(File, MsgStoreState).
+
+run_thunks(Thunks) ->
+ lists:foldl(fun (Thunk, Thunks1) ->
+ case Thunk() of
+ ok -> Thunks1;
+ Thunk1 -> [Thunk1 | Thunks1]
+ end
+ end, [], Thunks).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d913092cce..befcfd99ad 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1502,12 +1502,13 @@ msg_store_remove(MsgStore, Ref, Guids) ->
with_msg_store_client(MsgStore, Ref, Fun) ->
rabbit_msg_store:client_terminate(
- Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))).
+ Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined))).
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
- lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)).
+ lists:foldl(
+ fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined), L)).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1516,7 +1517,7 @@ test_msg_store() ->
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
Ref = rabbit_guid:guid(),
MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ undefined, undefined),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, Guids, MSCState),
%% publish the first half
@@ -1583,7 +1584,7 @@ test_msg_store() ->
{Guid, 0, GuidsTail}
end, Guids2ndHalf}),
MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ undefined, undefined),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
@@ -1593,7 +1594,7 @@ test_msg_store() ->
%% restart empty
restart_msg_store_empty(),
MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ undefined, undefined),
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids, MSCState6),
%% publish the first half again
@@ -1602,7 +1603,7 @@ test_msg_store() ->
ok = rabbit_msg_store:client_terminate(
msg_store_read(Guids1stHalf, MSCState6)),
MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
- undefined),
+ undefined, undefined),
ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
@@ -1661,7 +1662,7 @@ init_test_queue() ->
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef, undefined),
+ PRef, undefined, undefined),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
fun (Guid) ->
@@ -1695,7 +1696,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
end,
- MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
+ MSCState =
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined),
{A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 665cac96d9..8c048575a8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -436,10 +436,12 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms};
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
- PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef, MsgOnDiskFun),
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE,
- TRef, undefined),
+ PersistentClient = rabbit_msg_store:client_init(
+ ?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun,
+ msg_store_close_fds_fun(true)),
+ TransientClient = rabbit_msg_store:client_init(
+ ?TRANSIENT_MSG_STORE, TRef, undefined,
+ msg_store_close_fds_fun(false)),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
@@ -933,7 +935,9 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun) ->
- rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
+ rabbit_msg_store:client_init(
+ MsgStore, rabbit_guid:guid(), MsgOnDiskFun,
+ msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
with_immutable_msg_store_state(
@@ -960,6 +964,23 @@ msg_store_sync(MSCState, IsPersistent, Guids, Callback) ->
MSCState, IsPersistent,
fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end).
+msg_store_close_fds(MSCState, IsPersistent) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
+
+msg_store_close_fds_fun(IsPersistent) ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ Self,
+ fun (State = #vqstate { msg_store_clients = MSCState }) ->
+ {[], State #vqstate { msg_store_clients =
+ msg_store_close_fds(
+ MSCState, IsPersistent) }}
+ end)
+ end.
+
maybe_write_delivered(false, _SeqId, IndexState) ->
IndexState;
maybe_write_delivered(true, SeqId, IndexState) ->