summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-06-21 15:24:05 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-06-21 15:24:05 +0100
commit2e1ae067bef44838e191647d4c1c1b69c183b10c (patch)
tree3a85c348349d4b694bd5264413f49820ca792b4f
parent81ff300a43640acd7a6c36f2dc31a73f8dc05bb2 (diff)
downloadrabbitmq-server-git-2e1ae067bef44838e191647d4c1c1b69c183b10c.tar.gz
Use separate index to store dying clients file offsets
-rw-r--r--src/rabbit_msg_store.erl40
1 files changed, 32 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2300664687..68f346ca70 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -91,6 +91,8 @@
flying_ets,
%% set of dying clients
dying_clients,
+ %% index of file positions for client death messages
+ dying_client_index,
%% map of references of all registered clients
%% to callbacks
clients,
@@ -131,6 +133,12 @@
msg_store
}).
+-record(dying_client,
+ { client_ref,
+ file,
+ offset
+ }).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -420,6 +428,10 @@
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%%
+%% Client termination messages are stored in a separate ets index to
+%% avoid filling primary message store index and message files with
+%% client termination messages.
+%%
%% When the msg_store has a backlog (i.e. it has unprocessed messages
%% in its mailbox / gen_server priority queue), a further optimisation
%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
@@ -691,7 +703,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
end.
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
- dying_clients = DyingClients }) ->
+ dying_clients = DyingClients,
+ dying_client_index = DyingIndex }) ->
+ true = ets:delete(DyingIndex, CRef),
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.
@@ -745,6 +759,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
+ DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
+ [set, public, {keypos, #dying_client.client_ref}]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -776,6 +792,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
dying_clients = sets:new(),
+ dying_client_index = DyingIndex,
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -852,15 +869,21 @@ handle_call({contains, MsgId}, From, State) ->
noreply(State1).
handle_cast({client_dying, CRef},
- State = #msstate { dying_clients = DyingClients }) ->
+ State = #msstate { dying_clients = DyingClients,
+ dying_client_index = DyingIndex,
+ current_file_handle = CurHdl,
+ current_file = CurFile }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- noreply(write_message(CRef, <<>>,
- State #msstate { dying_clients = DyingClients1 }));
+ {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
+ true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
+ file = CurFile,
+ offset = CurOffset}),
+ noreply(State #msstate { dying_clients = DyingClients1 });
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
- noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
+ noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
@@ -1338,7 +1361,8 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
- State = #msstate { dying_clients = DyingClients }) ->
+ State = #msstate { dying_clients = DyingClients,
+ dying_client_index = DyingIndex }) ->
case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
{false, Location};
@@ -1346,8 +1370,8 @@ should_mask_action(CRef, MsgId,
{true, not_found};
{true, #msg_location { file = File, offset = Offset,
ref_count = RefCount } = Location} ->
- #msg_location { file = DeathFile, offset = DeathOffset } =
- index_lookup(CRef, State),
+ [#dying_client { file = DeathFile, offset = DeathOffset }] =
+ ets:lookup(DyingIndex, CRef),
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
{true, _} -> true;
{false, 0} -> false_if_increment;