diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-07-08 17:17:14 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-07-08 17:17:14 +0400 |
| commit | d04cb67d10d95d2482408bc6d5dac7b49e1db3ed (patch) | |
| tree | c5a5de570f7847afc3f6ad0a48fbbe8cc0c1c38f /src | |
| parent | aeb5c6b6d3e376732e03d0788cd9c76e7a79e3c4 (diff) | |
| parent | 730ffa5f906f6e1aca34a333d5d6c7350ed3e2dd (diff) | |
| download | rabbitmq-server-git-d04cb67d10d95d2482408bc6d5dac7b49e1db3ed.tar.gz | |
Merge pull request #855 from rabbitmq/rabbitmq-server-839
Use separate index to store dying clients file offsets
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 40 |
1 files changed, 32 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 22af95ced1..d3ff077c8b 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -91,6 +91,8 @@ flying_ets, %% set of dying clients dying_clients, + %% index of file positions for client death messages + dying_client_index, %% map of references of all registered clients %% to callbacks clients, @@ -131,6 +133,12 @@ msg_store }). +-record(dying_client, + { client_ref, + file, + offset + }). + %%---------------------------------------------------------------------------- -export_type([gc_state/0, file_num/0]). @@ -416,6 +424,10 @@ %% performance with many healthy clients and few, if any, dying %% clients, which is the typical case. %% +%% Client termination messages are stored in a separate ets index to +%% avoid filling primary message store index and message files with +%% client termination messages. +%% %% When the msg_store has a backlog (i.e. it has unprocessed messages %% in its mailbox / gen_server priority queue), a further optimisation %% opportunity arises: we can eliminate pairs of 'write' and 'remove' @@ -687,7 +699,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, - dying_clients = DyingClients }) -> + dying_clients = DyingClients, + dying_client_index = DyingIndex }) -> + ets:delete(DyingIndex, CRef), State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), dying_clients = sets:del_element(CRef, DyingClients) }. @@ -741,6 +755,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), + DyingIndex = ets:new(rabbit_msg_store_dying_client_index, + [set, public, {keypos, #dying_client.client_ref}]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -772,6 +788,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, flying_ets = FlyingEts, dying_clients = sets:new(), + dying_client_index = DyingIndex, clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -848,15 +865,21 @@ handle_call({contains, MsgId}, From, State) -> noreply(State1). handle_cast({client_dying, CRef}, - State = #msstate { dying_clients = DyingClients }) -> + State = #msstate { dying_clients = DyingClients, + dying_client_index = DyingIndex, + current_file_handle = CurHdl, + current_file = CurFile }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - noreply(write_message(CRef, <<>>, - State #msstate { dying_clients = DyingClients1 })); + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), + true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef, + file = CurFile, + offset = CurOffset}), + noreply(State #msstate { dying_clients = DyingClients1 }); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, - noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); + noreply(clear_client(CRef, State1)); handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, @@ -1334,7 +1357,8 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) -> %% msg and thus should be ignored. Note that this (correctly) returns %% false when testing to remove the death msg itself. should_mask_action(CRef, MsgId, - State = #msstate { dying_clients = DyingClients }) -> + State = #msstate { dying_clients = DyingClients, + dying_client_index = DyingIndex }) -> case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of {false, Location} -> {false, Location}; @@ -1342,8 +1366,8 @@ should_mask_action(CRef, MsgId, {true, not_found}; {true, #msg_location { file = File, offset = Offset, ref_count = RefCount } = Location} -> - #msg_location { file = DeathFile, offset = DeathOffset } = - index_lookup(CRef, State), + [#dying_client { file = DeathFile, offset = DeathOffset }] = + ets:lookup(DyingIndex, CRef), {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of {true, _} -> true; {false, 0} -> false_if_increment; |
