summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-07-08 17:17:14 +0400
committerGitHub <noreply@github.com>2016-07-08 17:17:14 +0400
commitd04cb67d10d95d2482408bc6d5dac7b49e1db3ed (patch)
treec5a5de570f7847afc3f6ad0a48fbbe8cc0c1c38f /src
parentaeb5c6b6d3e376732e03d0788cd9c76e7a79e3c4 (diff)
parent730ffa5f906f6e1aca34a333d5d6c7350ed3e2dd (diff)
downloadrabbitmq-server-git-d04cb67d10d95d2482408bc6d5dac7b49e1db3ed.tar.gz
Merge pull request #855 from rabbitmq/rabbitmq-server-839
Use separate index to store dying clients file offsets
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl40
1 files changed, 32 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 22af95ced1..d3ff077c8b 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -91,6 +91,8 @@
flying_ets,
%% set of dying clients
dying_clients,
+ %% index of file positions for client death messages
+ dying_client_index,
%% map of references of all registered clients
%% to callbacks
clients,
@@ -131,6 +133,12 @@
msg_store
}).
+-record(dying_client,
+ { client_ref,
+ file,
+ offset
+ }).
+
%%----------------------------------------------------------------------------
-export_type([gc_state/0, file_num/0]).
@@ -416,6 +424,10 @@
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%%
+%% Client termination messages are stored in a separate ets index to
+%% avoid filling primary message store index and message files with
+%% client termination messages.
+%%
%% When the msg_store has a backlog (i.e. it has unprocessed messages
%% in its mailbox / gen_server priority queue), a further optimisation
%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
@@ -687,7 +699,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
end.
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
- dying_clients = DyingClients }) ->
+ dying_clients = DyingClients,
+ dying_client_index = DyingIndex }) ->
+ ets:delete(DyingIndex, CRef),
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.
@@ -741,6 +755,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
+ DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
+ [set, public, {keypos, #dying_client.client_ref}]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -772,6 +788,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
dying_clients = sets:new(),
+ dying_client_index = DyingIndex,
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -848,15 +865,21 @@ handle_call({contains, MsgId}, From, State) ->
noreply(State1).
handle_cast({client_dying, CRef},
- State = #msstate { dying_clients = DyingClients }) ->
+ State = #msstate { dying_clients = DyingClients,
+ dying_client_index = DyingIndex,
+ current_file_handle = CurHdl,
+ current_file = CurFile }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- noreply(write_message(CRef, <<>>,
- State #msstate { dying_clients = DyingClients1 }));
+ {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
+ true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
+ file = CurFile,
+ offset = CurOffset}),
+ noreply(State #msstate { dying_clients = DyingClients1 });
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
- noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
+ noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
@@ -1334,7 +1357,8 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
- State = #msstate { dying_clients = DyingClients }) ->
+ State = #msstate { dying_clients = DyingClients,
+ dying_client_index = DyingIndex }) ->
case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
{false, Location};
@@ -1342,8 +1366,8 @@ should_mask_action(CRef, MsgId,
{true, not_found};
{true, #msg_location { file = File, offset = Offset,
ref_count = RefCount } = Location} ->
- #msg_location { file = DeathFile, offset = DeathOffset } =
- index_lookup(CRef, State),
+ [#dying_client { file = DeathFile, offset = DeathOffset }] =
+ ets:lookup(DyingIndex, CRef),
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
{true, _} -> true;
{false, 0} -> false_if_increment;