diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-14 18:52:55 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-14 18:52:55 +0000 |
| commit | a31371b6e4792b24d25303af2495d166f637261b (patch) | |
| tree | 0e52007320ed3831a98abc94da67f59946674dbd /src | |
| parent | 3a746c1e5e1125891aa48490649ea7144a905463 (diff) | |
| download | rabbitmq-server-git-a31371b6e4792b24d25303af2495d166f637261b.tar.gz | |
Combine client_refs with client_msg_on_disk_callback into clients
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 140 |
1 files changed, 65 insertions, 75 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9c98324c18..ea26bc892c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -82,11 +82,10 @@ dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table dying_clients, %% set of dying clients - client_refs, %% map of references of all registered clients - %% mapping to close_fds_fun callbacks + clients, %% map of references of all registered clients + %% to callbacks successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? - client_ondisk_callback, %% client ref to callback function mapping cref_to_guids %% client ref to synced messages mapping }). @@ -386,9 +385,9 @@ %% %% We use a separate set to keep track of the dying clients in order %% to keep that set, which is inspected on every write and remove, as -%% small as possible. Inspecting client_refs - the set of all clients -%% - would degrade performance with many healthy clients and few, if -%% any, dying clients, which is the typical case. +%% small as possible. Inspecting the set of all clients would degrade +%% performance with many healthy clients and few, if any, dying +%% clients, which is the typical case. %% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -600,11 +599,12 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. -clear_client_callback(CRef, - State = #msstate { client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> - State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), - cref_to_guids = dict:erase(CRef, CTG)}. +clear_client(CRef, State = #msstate { cref_to_guids = CTG, + clients = Clients, + dying_clients = DyingClients }) -> + State #msstate { cref_to_guids = dict:erase(CRef, CTG), + clients = dict:erase(CRef, Clients), + dying_clients = sets:del_element(CRef, DyingClients) }. %%---------------------------------------------------------------------------- @@ -637,7 +637,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {FileSummaryRecovered, FileSummaryEts} = recover_file_summary(AttemptFileSummaryRecovery, Dir), - {CleanShutdown, IndexState, ClientRefs1} = + {CleanShutdown, IndexState, Clients1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), %% CleanShutdown => msg location index and file_summary both @@ -673,10 +673,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, dying_clients = sets:new(), - client_refs = ClientRefs1, + clients = Clients1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - client_ondisk_callback = dict:new(), cref_to_guids = dict:new() }, @@ -734,22 +733,15 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - client_ondisk_callback = CODC, + clients = Clients, gc_pid = GCPid }) -> - CODC1 = case MsgOnDiskFun of - undefined -> CODC; - _ -> dict:store(CRef, MsgOnDiskFun, CODC) - end, - ClientRefs1 = dict:store(CRef, CloseFDsFun, ClientRefs), + Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = ClientRefs1, - client_ondisk_callback = CODC1 }); + State #msstate { clients = Clients1 }); -handle_call({client_terminate, CRef}, _From, - State) -> - reply(ok, clear_client_callback(CRef, State)); +handle_call({client_terminate, CRef}, _From, State) -> + reply(ok, clear_client(CRef, State)); handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), @@ -764,24 +756,18 @@ handle_cast({client_dying, CRef}, DyingClients1 = sets:add_element(CRef, DyingClients), write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); -handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs, - dying_clients = DyingClients }) -> - State1 = clear_client_callback( - CRef, State #msstate { - client_refs = dict:erase(CRef, ClientRefs), - dying_clients = sets:del_element(CRef, DyingClients) }), - noreply(remove_message(CRef, CRef, State1)); +handle_cast({client_delete, CRef}, State) -> + noreply(remove_message(CRef, CRef, clear_client(CRef, State))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts, - client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> + State = #msstate { file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + clients = Clients, + cref_to_guids = CTG }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC), + CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, Clients), State1 = State #msstate { cref_to_guids = CTG1 }, case should_mask_action(CRef, Guid, State) of {true, _Location} -> @@ -847,10 +833,10 @@ handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - client_refs = ClientRefs }) -> + clients = Clients }) -> ok = cleanup_after_file_deletion(Source, State), %% see comment in cleanup_after_file_deletion, and client_read3 - true = mark_handle_to_close(ClientRefs, FileHandlesEts, Destination, false), + true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false), true = ets:update_element(FileSummaryEts, Destination, {#file_summary.locked, false}), State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, @@ -880,7 +866,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, + clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull %% out the ets tables from under it. @@ -896,7 +882,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, [ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), - store_recovery_terms([{client_refs, dict:fetch_keys(ClientRefs)}, + store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -1151,41 +1137,45 @@ orddict_store(Key, Val, Dict) -> orddict:store(Key, Val, Dict). client_confirm(CRef, Guids, ActionTaken, - State = #msstate { client_ondisk_callback = CODC, - cref_to_guids = CTG }) -> - case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids, ActionTaken), - CTG1 = case dict:find(CRef, CTG) of - {ok, Gs} -> - Guids1 = gb_sets:difference(Gs, Guids), - case gb_sets:is_empty(Guids1) of - true -> dict:erase(CRef, CTG); - false -> dict:store(CRef, Guids1, CTG) - end; - error -> CTG - end, - State #msstate { cref_to_guids = CTG1 }; - error -> State + State = #msstate { clients = Clients, + cref_to_guids = CTG }) -> + case dict:fetch(CRef, Clients) of + {undefined, _CloseFDsFun} -> + State; + {MsgOnDiskFun, _CloseFDsFun} -> + MsgOnDiskFun(Guids, ActionTaken), + CTG1 = case dict:find(CRef, CTG) of + {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end, + State #msstate { cref_to_guids = CTG1 } end. -add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) -> - case dict:find(CRef, CODC) of - {ok, _} -> dict:update(CRef, - fun (Guids) -> gb_sets:add(Guid, Guids) end, - gb_sets:singleton(Guid), CTG); - error -> CTG +add_cref_to_guids_if_callback(CRef, Guid, CTG, Clients) -> + case dict:fetch(CRef, Clients) of + {undefined, _CloseFDsFun} -> + CTG; + {_MsgOnDiskFun, _CloseFDsFun} -> + dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end, + gb_sets:singleton(Guid), CTG) end. client_confirm_if_on_disk(CRef, Guid, File, - State = #msstate { client_ondisk_callback = CODC, + State = #msstate { clients = Clients, current_file = CurFile, cref_to_guids = CTG }) -> CTG1 = case File of - CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC); - _ -> case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(gb_sets:singleton(Guid), written); - _ -> ok + CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, Clients); + _ -> case dict:fetch(CRef, Clients) of + {undefined, _CloseFDsFun} -> + ok; + {MsgOnDiskFun, _CloseFDsFun} -> + MsgOnDiskFun(gb_sets:singleton(Guid), written) end, CTG end, @@ -1248,8 +1238,8 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> case ets:update_element(FileHandlesEts, Key, {2, close}) andalso Invoke of true -> case dict:fetch(Ref, ClientRefs) of - undefined -> ok; - CloseFDsFun -> ok = CloseFDsFun() + {_MsgOnDiskFun, undefined} -> ok; + {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun() end; false -> ok end @@ -1445,8 +1435,8 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> true -> case IndexModule:recover(Dir) of {ok, IndexState1} -> {true, IndexState1, - dict:from_list( - [{CRef, undefined} || CRef <- ClientRefs])}; + dict:from_list([{CRef, {undefined, undefined}} + || CRef <- ClientRefs])}; {error, Error} -> Fresh("failed to recover index: ~p", [Error]) end; @@ -1788,7 +1778,7 @@ delete_file_if_empty(File, State = #msstate { cleanup_after_file_deletion(File, #msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - client_refs = ClientRefs }) -> + clients = Clients }) -> %% Ensure that any clients that have open fhs to the file close %% them before using them again. This has to be done here (given %% it's done in the msg_store, and not the gc), and not when @@ -1796,7 +1786,7 @@ cleanup_after_file_deletion(File, %% the client could find the close, and close and reopen the fh, %% whilst the GC is waiting for readers to disappear, before it's %% actually done the GC. - true = mark_handle_to_close(ClientRefs, FileHandlesEts, File, true), + true = mark_handle_to_close(Clients, FileHandlesEts, File, true), [#file_summary { left = Left, right = Right, locked = true, |
