diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-06-21 15:24:05 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-06-21 15:24:05 +0100 |
| commit | 2e1ae067bef44838e191647d4c1c1b69c183b10c (patch) | |
| tree | 3a85c348349d4b694bd5264413f49820ca792b4f | |
| parent | 81ff300a43640acd7a6c36f2dc31a73f8dc05bb2 (diff) | |
| download | rabbitmq-server-git-2e1ae067bef44838e191647d4c1c1b69c183b10c.tar.gz | |
Use separate index to store dying clients file offsets
| -rw-r--r-- | src/rabbit_msg_store.erl | 40 |
1 files changed, 32 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2300664687..68f346ca70 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -91,6 +91,8 @@ flying_ets, %% set of dying clients dying_clients, + %% index of file positions for client death messages + dying_client_index, %% map of references of all registered clients %% to callbacks clients, @@ -131,6 +133,12 @@ msg_store }). +-record(dying_client, + { client_ref, + file, + offset + }). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -420,6 +428,10 @@ %% performance with many healthy clients and few, if any, dying %% clients, which is the typical case. %% +%% Client termination messages are stored in a separate ets index to +%% avoid filling primary message store index and message files with +%% client termination messages. +%% %% When the msg_store has a backlog (i.e. it has unprocessed messages %% in its mailbox / gen_server priority queue), a further optimisation %% opportunity arises: we can eliminate pairs of 'write' and 'remove' @@ -691,7 +703,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, - dying_clients = DyingClients }) -> + dying_clients = DyingClients, + dying_client_index = DyingIndex }) -> + true = ets:delete(DyingIndex, CRef), State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), dying_clients = sets:del_element(CRef, DyingClients) }. @@ -745,6 +759,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), + DyingIndex = ets:new(rabbit_msg_store_dying_client_index, + [set, public, {keypos, #dying_client.client_ref}]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -776,6 +792,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, flying_ets = FlyingEts, dying_clients = sets:new(), + dying_client_index = DyingIndex, clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -852,15 +869,21 @@ handle_call({contains, MsgId}, From, State) -> noreply(State1). handle_cast({client_dying, CRef}, - State = #msstate { dying_clients = DyingClients }) -> + State = #msstate { dying_clients = DyingClients, + dying_client_index = DyingIndex, + current_file_handle = CurHdl, + current_file = CurFile }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - noreply(write_message(CRef, <<>>, - State #msstate { dying_clients = DyingClients1 })); + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), + true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef, + file = CurFile, + offset = CurOffset}), + noreply(State #msstate { dying_clients = DyingClients1 }); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, - noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); + noreply(clear_client(CRef, State1)); handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, @@ -1338,7 +1361,8 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) -> %% msg and thus should be ignored. Note that this (correctly) returns %% false when testing to remove the death msg itself. should_mask_action(CRef, MsgId, - State = #msstate { dying_clients = DyingClients }) -> + State = #msstate { dying_clients = DyingClients, + dying_client_index = DyingIndex }) -> case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of {false, Location} -> {false, Location}; @@ -1346,8 +1370,8 @@ should_mask_action(CRef, MsgId, {true, not_found}; {true, #msg_location { file = File, offset = Offset, ref_count = RefCount } = Location} -> - #msg_location { file = DeathFile, offset = DeathOffset } = - index_lookup(CRef, State), + [#dying_client { file = DeathFile, offset = DeathOffset }] = + ets:lookup(DyingIndex, CRef), {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of {true, _} -> true; {false, 0} -> false_if_increment; |
