diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-10 12:11:28 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-10 12:11:28 +0000 |
| commit | f216fd35c47e8321c65e59164ee3754f9c133a3b (patch) | |
| tree | d6f02c431f6fb91128abe6aead59f652dfb275f5 | |
| parent | 99a50e456314243875880cb4e5f5db9691c11e8d (diff) | |
| parent | cc0fd0b7698cc79ac6bd1074c9addfcd90e14f06 (diff) | |
| download | rabbitmq-server-git-f216fd35c47e8321c65e59164ee3754f9c133a3b.tar.gz | |
Merging default into bug23329
| -rw-r--r-- | src/rabbit_msg_store.erl | 159 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
2 files changed, 117 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b20b61e7b3..4f239096a9 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,6 +81,7 @@ file_summary_ets, %% tid of the file summary table dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table + dying_clients_ets, %% tid of the dying clients table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? @@ -361,6 +362,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) -> client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), + ok = server_cast(CState, {client_dying, Ref}), ok = server_cast(CState, {client_delete, Ref}). client_ref(#client_msstate { client_ref = Ref }) -> Ref. @@ -580,6 +582,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + DyingClientsEts = ets:new(rabbit_msg_store_terminal, [set]), + {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), State = #msstate { dir = Dir, @@ -598,6 +602,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients_ets = DyingClientsEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -643,6 +648,7 @@ prioritise_cast(Msg, _State) -> {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; + {client_dying, _Pid} -> 7; _ -> 0 end. @@ -681,11 +687,24 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1). +handle_cast({client_dying, CRef}, + State = #msstate { dying_clients_ets = DyingClientsEts }) -> + %% Note that we use a separate set for the dying clients in order + %% to keep that set, which is inspected on every write and remove, + %% as small as possible - inspecting the set of all clients would + %% degrade performance with many healthy clients and few dying + %% clients. + true = ets:insert_new(DyingClientsEts, {CRef, const}), + write_message(CRef, <<>>, State); + handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> + State = #msstate { client_refs = ClientRefs, + dying_clients_ets = DyingClientsEts }) -> + true = ets:delete(DyingClientsEts, CRef), State1 = clear_client_callback(CRef, State), - noreply(State1 #msstate { - client_refs = sets:del_element(CRef, ClientRefs) }); + ClientRefs1 = sets:del_element(CRef, ClientRefs), + noreply(remove_message(CRef, CRef, + State1 #msstate { client_refs = ClientRefs1 })); handle_cast({write, CRef, Guid}, State = #msstate { sum_valid_data = SumValid, @@ -705,23 +724,33 @@ handle_cast({write, CRef, Guid}, error -> CTG end, State1 = State #msstate { cref_to_guids = CTG1 }, - case index_lookup(Guid, State1) of - not_found -> + case should_mask_action(CRef, Guid, State1) of + {true, _Location} -> + noreply(State1); + {false, not_found} -> write_message(Guid, Msg, State1); - #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State1), write_message(Guid, Msg, State1); - [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State1), + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older then the client death + %% message, but seeing as it's being GC'd + %% currently, we'll have to write a new copy, + %% which will then be younger, so ignore this + %% write. + noreply(State1); + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), [_] = ets:update_counter( FileSummaryEts, File, [{#file_summary.valid_total_size, TotalSize}]), noreply(State1 #msstate { sum_valid_data = SumValid + TotalSize }) end; - #msg_location { ref_count = RefCount, file = File } -> + {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC ok = index_update_ref_count(Guid, RefCount + 1, State1), @@ -732,12 +761,13 @@ handle_cast({write, CRef, Guid}, CTG; _ -> CTG1 end, - noreply(State #msstate { cref_to_guids = CTG2 }) + noreply(State1 #msstate { cref_to_guids = CTG2 }) end; handle_cast({remove, CRef, Guids}, State) -> - State1 = lists:foldl(fun (Guid, State2) -> remove_message(Guid, State2) end, - State, Guids), + State1 = lists:foldl( + fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, + State, Guids), State3 = client_confirm(CRef, gb_sets:from_list(Guids), removed, State1), noreply(maybe_compact(State3)); @@ -801,6 +831,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients_ets = DyingClientsEts, client_refs = ClientRefs, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -814,8 +845,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), store_file_summary(FileSummaryEts, Dir), - [ets:delete(T) || - T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], + [ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, + CurFileCacheEts, DyingClientsEts]], IndexModule:terminate(IndexState), store_recovery_terms([{client_refs, sets:to_list(ClientRefs)}, {index_module, IndexModule}], Dir), @@ -991,34 +1022,48 @@ contains_message(Guid, From, end end. -remove_message(Guid, State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = - index_lookup_positive_ref_count(Guid, State), - %% only update field, otherwise bad interaction with concurrent GC - Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, File, State); - [#file_summary {}] -> +remove_message(Guid, CRef, + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg + %% whilst it's being GC'd. ASSERTION: + %% [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = + fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + case RefCount of + %% don't remove from CUR_FILE_CACHE_ETS_NAME here + %% because there may be further writes in the mailbox + %% for the same msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, Guid, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + [_] = ets:update_counter( + FileSummaryEts, File, + [{#file_summary.valid_total_size, + -TotalSize}]), + delete_file_if_empty( + File, State #msstate { + sum_valid_data = SumValid - TotalSize }) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), ok = Dec(), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, -TotalSize}]), - delete_file_if_empty( - File, State #msstate { - sum_valid_data = SumValid - TotalSize }) - end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), - ok = Dec(), - State + State + end end. add_to_pending_gc_completion( @@ -1040,8 +1085,8 @@ run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending_action({remove, Guid}, State) -> - remove_message(Guid, State). +run_pending_action({remove, Guid, CRef}, State) -> + remove_message(Guid, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1074,6 +1119,30 @@ client_confirm(CRef, Guids, ActionTaken, error -> State end. +%% Detect whether the Guid is older or younger than the client's death +%% msg (if there is one). If the msg is older than the client death +%% msg, and it has a 0 ref_count we must only alter the ref_count, +%% not rewrite the msg - rewriting it would make it younger than the +%% death msg and thus should be ignored. Note that this will +%% (correctly) return false when testing to remove the death msg +%% itself. +should_mask_action(CRef, Guid, + State = #msstate { dying_clients_ets = DyingClientsEts }) -> + case {ets:lookup(DyingClientsEts, CRef), index_lookup(Guid, State)} of + {[], Location} -> + {false, Location}; + {[{_CRef, const}], not_found} -> + {true, not_found}; + {[{_CRef, const}], #msg_location { file = File, offset = Offset, + ref_count = RefCount } = Location} -> + #msg_location { file = DeathFile, offset = DeathOffset } = + index_lookup(CRef, State), + {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of + {true, _} -> true; + {false, 0} -> false_if_increment; + {false, _} -> false + end, Location} + end. %%---------------------------------------------------------------------------- %% file helper functions diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9614dc9043..d913092cce 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1696,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> false -> ?TRANSIENT_MSG_STORE end, MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), - {A, B} = + {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), @@ -1705,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> ok = rabbit_msg_store:write(Guid, Guid, MSCState), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} end, {Qi, []}, SeqIds), + %% do this just to force all of the publishes through to the msg_store: + true = rabbit_msg_store:contains(LastGuidWritten, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. |
