summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl49
1 files changed, 22 insertions, 27 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index cb4768bdbb..7356584758 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -363,8 +363,7 @@ set_maximum_since_use(Server, Age) ->
client_init(Server, Ref, MsgOnDiskFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
- register_sync_callback(Server, Ref, MsgOnDiskFun),
+ gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -387,9 +386,6 @@ client_delete_and_terminate(CState, Server, Ref) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-register_sync_callback(Server, ClientRef, Fun) ->
- gen_server2:call(Server, {register_sync_callback, ClientRef, Fun}, infinity).
-
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
@@ -591,10 +587,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- {new_client_state, _Ref} -> 7;
- successfully_recovered_state -> 7;
- {read, _Guid} -> 2;
- _ -> 0
+ {new_client_state, _Ref, _MODC} -> 7;
+ successfully_recovered_state -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -613,30 +609,29 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1);
-handle_call({new_client_state, CRef}, _From,
- State = #msstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
- gc_pid = GCPid }) ->
+handle_call({new_client_state, CRef, Callback}, _From,
+ State = #msstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs,
+ client_ondisk_callback = CODC,
+ gc_pid = GCPid }) ->
+ CODC1 = case Callback of
+ undefined -> CODC;
+ _ -> dict:store(CRef, Callback, CODC)
+ end,
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
+ State #msstate { client_refs = sets:add_element(CRef, ClientRefs),
+ client_ondisk_callback = CODC1 });
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({register_sync_callback, _ , undefined}, _From, State) ->
- reply(ok, State);
-handle_call({register_sync_callback, ClientRef, Fun}, _From,
- State = #msstate { client_ondisk_callback = CODC }) ->
- reply(ok, State #msstate { client_ondisk_callback =
- dict:store(ClientRef, Fun, CODC) });
-
handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->