diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 49 |
1 files changed, 22 insertions, 27 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index cb4768bdbb..7356584758 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -363,8 +363,7 @@ set_maximum_since_use(Server, Age) -> client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), - register_sync_callback(Server, Ref, MsgOnDiskFun), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, infinity), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -387,9 +386,6 @@ client_delete_and_terminate(CState, Server, Ref) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -register_sync_callback(Server, ClientRef, Fun) -> - gen_server2:call(Server, {register_sync_callback, ClientRef, Fun}, infinity). - %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- @@ -591,10 +587,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - {new_client_state, _Ref} -> 7; - successfully_recovered_state -> 7; - {read, _Guid} -> 2; - _ -> 0 + {new_client_state, _Ref, _MODC} -> 7; + successfully_recovered_state -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -613,30 +609,29 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1); -handle_call({new_client_state, CRef}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - gc_pid = GCPid }) -> +handle_call({new_client_state, CRef, Callback}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + client_ondisk_callback = CODC, + gc_pid = GCPid }) -> + CODC1 = case Callback of + undefined -> CODC; + _ -> dict:store(CRef, Callback, CODC) + end, reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + client_ondisk_callback = CODC1 }); handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({register_sync_callback, _ , undefined}, _From, State) -> - reply(ok, State); -handle_call({register_sync_callback, ClientRef, Fun}, _From, - State = #msstate { client_ondisk_callback = CODC }) -> - reply(ok, State #msstate { client_ondisk_callback = - dict:store(ClientRef, Fun, CODC) }); - handle_call({client_terminate, #client_msstate { client_ref = CRef }}, _From, State = #msstate { client_ondisk_callback = CODC, cref_to_guids = CTG }) -> |
