summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-14 18:52:55 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-14 18:52:55 +0000
commita31371b6e4792b24d25303af2495d166f637261b (patch)
tree0e52007320ed3831a98abc94da67f59946674dbd /src
parent3a746c1e5e1125891aa48490649ea7144a905463 (diff)
downloadrabbitmq-server-git-a31371b6e4792b24d25303af2495d166f637261b.tar.gz
Combine client_refs with client_msg_on_disk_callback into clients
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl140
1 files changed, 65 insertions, 75 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9c98324c18..ea26bc892c 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -82,11 +82,10 @@
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
- client_refs, %% map of references of all registered clients
- %% mapping to close_fds_fun callbacks
+ clients, %% map of references of all registered clients
+ %% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- client_ondisk_callback, %% client ref to callback function mapping
cref_to_guids %% client ref to synced messages mapping
}).
@@ -386,9 +385,9 @@
%%
%% We use a separate set to keep track of the dying clients in order
%% to keep that set, which is inspected on every write and remove, as
-%% small as possible. Inspecting client_refs - the set of all clients
-%% - would degrade performance with many healthy clients and few, if
-%% any, dying clients, which is the typical case.
+%% small as possible. Inspecting the set of all clients would degrade
+%% performance with many healthy clients and few, if any, dying
+%% clients, which is the typical case.
%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -600,11 +599,12 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
-clear_client_callback(CRef,
- State = #msstate { client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
- State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
- cref_to_guids = dict:erase(CRef, CTG)}.
+clear_client(CRef, State = #msstate { cref_to_guids = CTG,
+ clients = Clients,
+ dying_clients = DyingClients }) ->
+ State #msstate { cref_to_guids = dict:erase(CRef, CTG),
+ clients = dict:erase(CRef, Clients),
+ dying_clients = sets:del_element(CRef, DyingClients) }.
%%----------------------------------------------------------------------------
@@ -637,7 +637,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
{FileSummaryRecovered, FileSummaryEts} =
recover_file_summary(AttemptFileSummaryRecovery, Dir),
- {CleanShutdown, IndexState, ClientRefs1} =
+ {CleanShutdown, IndexState, Clients1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
%% CleanShutdown => msg location index and file_summary both
@@ -673,10 +673,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
- client_refs = ClientRefs1,
+ clients = Clients1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- client_ondisk_callback = dict:new(),
cref_to_guids = dict:new()
},
@@ -734,22 +733,15 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
- client_ondisk_callback = CODC,
+ clients = Clients,
gc_pid = GCPid }) ->
- CODC1 = case MsgOnDiskFun of
- undefined -> CODC;
- _ -> dict:store(CRef, MsgOnDiskFun, CODC)
- end,
- ClientRefs1 = dict:store(CRef, CloseFDsFun, ClientRefs),
+ Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = ClientRefs1,
- client_ondisk_callback = CODC1 });
+ State #msstate { clients = Clients1 });
-handle_call({client_terminate, CRef}, _From,
- State) ->
- reply(ok, clear_client_callback(CRef, State));
+handle_call({client_terminate, CRef}, _From, State) ->
+ reply(ok, clear_client(CRef, State));
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
@@ -764,24 +756,18 @@ handle_cast({client_dying, CRef},
DyingClients1 = sets:add_element(CRef, DyingClients),
write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
-handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs,
- dying_clients = DyingClients }) ->
- State1 = clear_client_callback(
- CRef, State #msstate {
- client_refs = dict:erase(CRef, ClientRefs),
- dying_clients = sets:del_element(CRef, DyingClients) }),
- noreply(remove_message(CRef, CRef, State1));
+handle_cast({client_delete, CRef}, State) ->
+ noreply(remove_message(CRef, CRef, clear_client(CRef, State)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts,
- client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients,
+ cref_to_guids = CTG }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
+ CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, Clients),
State1 = State #msstate { cref_to_guids = CTG1 },
case should_mask_action(CRef, Guid, State) of
{true, _Location} ->
@@ -847,10 +833,10 @@ handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- client_refs = ClientRefs }) ->
+ clients = Clients }) ->
ok = cleanup_after_file_deletion(Source, State),
%% see comment in cleanup_after_file_deletion, and client_read3
- true = mark_handle_to_close(ClientRefs, FileHandlesEts, Destination, false),
+ true = mark_handle_to_close(Clients, FileHandlesEts, Destination, false),
true = ets:update_element(FileSummaryEts, Destination,
{#file_summary.locked, false}),
State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed },
@@ -880,7 +866,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
+ clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
%% out the ets tables from under it.
@@ -896,7 +882,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
[ets:delete(T) ||
T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
- store_recovery_terms([{client_refs, dict:fetch_keys(ClientRefs)},
+ store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -1151,41 +1137,45 @@ orddict_store(Key, Val, Dict) ->
orddict:store(Key, Val, Dict).
client_confirm(CRef, Guids, ActionTaken,
- State = #msstate { client_ondisk_callback = CODC,
- cref_to_guids = CTG }) ->
- case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids, ActionTaken),
- CTG1 = case dict:find(CRef, CTG) of
- {ok, Gs} ->
- Guids1 = gb_sets:difference(Gs, Guids),
- case gb_sets:is_empty(Guids1) of
- true -> dict:erase(CRef, CTG);
- false -> dict:store(CRef, Guids1, CTG)
- end;
- error -> CTG
- end,
- State #msstate { cref_to_guids = CTG1 };
- error -> State
+ State = #msstate { clients = Clients,
+ cref_to_guids = CTG }) ->
+ case dict:fetch(CRef, Clients) of
+ {undefined, _CloseFDsFun} ->
+ State;
+ {MsgOnDiskFun, _CloseFDsFun} ->
+ MsgOnDiskFun(Guids, ActionTaken),
+ CTG1 = case dict:find(CRef, CTG) of
+ {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
+ case gb_sets:is_empty(Guids1) of
+ true -> dict:erase(CRef, CTG);
+ false -> dict:store(CRef, Guids1, CTG)
+ end;
+ error -> CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 }
end.
-add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
- case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef,
- fun (Guids) -> gb_sets:add(Guid, Guids) end,
- gb_sets:singleton(Guid), CTG);
- error -> CTG
+add_cref_to_guids_if_callback(CRef, Guid, CTG, Clients) ->
+ case dict:fetch(CRef, Clients) of
+ {undefined, _CloseFDsFun} ->
+ CTG;
+ {_MsgOnDiskFun, _CloseFDsFun} ->
+ dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG)
end.
client_confirm_if_on_disk(CRef, Guid, File,
- State = #msstate { client_ondisk_callback = CODC,
+ State = #msstate { clients = Clients,
current_file = CurFile,
cref_to_guids = CTG }) ->
CTG1 =
case File of
- CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
- _ -> case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
- _ -> ok
+ CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, Clients);
+ _ -> case dict:fetch(CRef, Clients) of
+ {undefined, _CloseFDsFun} ->
+ ok;
+ {MsgOnDiskFun, _CloseFDsFun} ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written)
end,
CTG
end,
@@ -1248,8 +1238,8 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
case ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke of
true -> case dict:fetch(Ref, ClientRefs) of
- undefined -> ok;
- CloseFDsFun -> ok = CloseFDsFun()
+ {_MsgOnDiskFun, undefined} -> ok;
+ {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
end;
false -> ok
end
@@ -1445,8 +1435,8 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
true -> case IndexModule:recover(Dir) of
{ok, IndexState1} ->
{true, IndexState1,
- dict:from_list(
- [{CRef, undefined} || CRef <- ClientRefs])};
+ dict:from_list([{CRef, {undefined, undefined}}
+ || CRef <- ClientRefs])};
{error, Error} ->
Fresh("failed to recover index: ~p", [Error])
end;
@@ -1788,7 +1778,7 @@ delete_file_if_empty(File, State = #msstate {
cleanup_after_file_deletion(File,
#msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- client_refs = ClientRefs }) ->
+ clients = Clients }) ->
%% Ensure that any clients that have open fhs to the file close
%% them before using them again. This has to be done here (given
%% it's done in the msg_store, and not the gc), and not when
@@ -1796,7 +1786,7 @@ cleanup_after_file_deletion(File,
%% the client could find the close, and close and reopen the fh,
%% whilst the GC is waiting for readers to disappear, before it's
%% actually done the GC.
- true = mark_handle_to_close(ClientRefs, FileHandlesEts, File, true),
+ true = mark_handle_to_close(Clients, FileHandlesEts, File, true),
[#file_summary { left = Left,
right = Right,
locked = true,