diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-06 07:20:04 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-06 07:20:04 +0100 |
| commit | 9664f07c47a2adb0128b9bcef0615abd8ea022ab (patch) | |
| tree | 0f24d55ab922ca876cdef5a03e112230cee3b1c5 /src | |
| parent | 3946353d89493d26bddcef75a7c7008998529cce (diff) | |
| download | rabbitmq-server-git-9664f07c47a2adb0128b9bcef0615abd8ea022ab.tar.gz | |
cosmetic
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 168 |
1 files changed, 92 insertions, 76 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 4dc390f66c..d92d7aa3e5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -310,7 +310,7 @@ write(Server, Guid, Msg, {gen_server2:cast(Server, {write, Guid, Msg}), CState}. read(Server, Guid, - CState = #client_msstate { dedup_cache_ets = DedupCacheEts, + CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache case fetch_and_increment_cache(DedupCacheEts, Guid) of @@ -389,14 +389,6 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). -update_msg_cache(CacheEts, Guid, Msg) -> - case ets:insert_new(CacheEts, {Guid, Msg, 1}) of - true -> ok; - false -> safe_ets_update_counter_ok( - CacheEts, Guid, {3, +1}, - fun () -> update_msg_cache(CacheEts, Guid, Msg) end) - end. - client_read1(Server, #msg_location { guid = Guid, file = File } = MsgLocation, Defer, @@ -539,7 +531,6 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> end end, - InitFile = 0, {FileSummaryRecovered, FileSummaryEts} = recover_file_summary(AllCleanShutdown, Dir), DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), @@ -550,7 +541,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> State = #msstate { dir = Dir, index_module = IndexModule, index_state = IndexState, - current_file = InitFile, + current_file = 0, current_file_handle = undefined, file_handle_cache = dict:new(), on_sync = [], @@ -641,10 +632,10 @@ handle_cast({write, Guid, Msg}, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - right = undefined, - locked = false, - file_size = FileSize }] = + contiguous_top = ContiguousTop, + right = undefined, + locked = false, + file_size = FileSize }] = ets:lookup(FileSummaryEts, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, ContiguousTop1 = if CurOffset =:= ContiguousTop -> @@ -664,7 +655,7 @@ handle_cast({write, Guid, Msg}, maybe_roll_to_new_file( NextOffset, State #msstate { sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize }))); + sum_file_size = SumFileSize + TotalSize }))); #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC @@ -705,8 +696,8 @@ handle_cast(sync, State) -> noreply(internal_sync(State)); handle_cast({gc_done, Reclaimed, Source, Dest}, - State = #msstate { sum_file_size = SumFileSize, - gc_active = {Source, Dest}, + State = #msstate { sum_file_size = SumFileSize, + gc_active = {Source, Dest}, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> %% GC done, so now ensure that any clients that have open fhs to @@ -721,9 +712,10 @@ handle_cast({gc_done, Reclaimed, Source, Dest}, %% we always move data left, so Source has gone and was on the %% right, so need to make dest = source.right.left, and also %% dest.right = source.right - [#file_summary { left = Dest, right = SourceRight, locked = true, - readers = 0 }] = - ets:lookup(FileSummaryEts, Source), + [#file_summary { left = Dest, + right = SourceRight, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, Source), %% this could fail if SourceRight == undefined ets:update_element(FileSummaryEts, SourceRight, {#file_summary.left, Dest}), @@ -733,7 +725,7 @@ handle_cast({gc_done, Reclaimed, Source, Dest}, true = ets:delete(FileSummaryEts, Source), noreply(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false })); + gc_active = false })); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -819,7 +811,7 @@ sort_file_names(FileNames) -> FileNames). internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> + on_sync = Syncs }) -> State1 = stop_sync_timer(State), case Syncs of [] -> State1; @@ -829,8 +821,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. -read_message(Guid, From, State = - #msstate { dedup_cache_ets = DedupCacheEts }) -> +read_message(Guid, From, + State = #msstate { dedup_cache_ets = DedupCacheEts }) -> case index_lookup(Guid, State) of not_found -> gen_server2:reply(From, not_found), State; @@ -846,11 +838,11 @@ read_message(Guid, From, State = read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, file = File, offset = Offset } = MsgLoc, - State = #msstate { current_file = CurFile, + State = #msstate { current_file = CurFile, current_file_handle = CurHdl, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }) -> + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> case File =:= CurFile of true -> {Msg, State1} = @@ -913,6 +905,14 @@ maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg) maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) -> ok. +update_msg_cache(CacheEts, Guid, Msg) -> + case ets:insert_new(CacheEts, {Guid, Msg, 1}) of + true -> ok; + false -> safe_ets_update_counter_ok( + CacheEts, Guid, {3, +1}, + fun () -> update_msg_cache(CacheEts, Guid, Msg) end) + end. + contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> case index_lookup(Guid, State) of not_found -> @@ -929,9 +929,9 @@ contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> end end. -remove_message(Guid, State = #msstate { sum_valid_data = SumValid, +remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> + dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize } = index_lookup(Guid, State), @@ -942,8 +942,8 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, %% msg. ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - locked = Locked }] = + contiguous_top = ContiguousTop, + locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of true -> @@ -1063,7 +1063,6 @@ store_file_summary(Tid, Dir) -> [{extended_info, [object_count]}]), ets:delete(Tid). - preallocate(Hdl, FileSizeLimit, FinalPos) -> {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), ok = file_handle_cache:truncate(Hdl), @@ -1130,7 +1129,7 @@ decrement_cache(DedupCacheEts, Guid) -> %%---------------------------------------------------------------------------- index_lookup(Key, #client_msstate { index_module = Index, - index_state = State }) -> + index_state = State }) -> Index:lookup(Key, State); index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> @@ -1143,14 +1142,14 @@ index_update(Obj, #msstate { index_module = Index, index_state = State }) -> Index:update(Obj, State). index_update_fields(Key, Updates, #msstate { index_module = Index, - index_state = State }) -> + index_state = State }) -> Index:update_fields(Key, Updates, State). index_delete(Key, #msstate { index_module = Index, index_state = State }) -> Index:delete(Key, State). index_delete_by_file(File, #msstate { index_module = Index, - index_state = State }) -> + index_state = State }) -> Index:delete_by_file(File, State). %%---------------------------------------------------------------------------- @@ -1315,13 +1314,14 @@ build_index(true, _Files, State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, - file_size = FileSize, file = File }, + file_size = FileSize, + file = File }, {_Offset, State1 = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize }}) -> + sum_file_size = SumFileSize }}) -> {FileSize, State1 #msstate { sum_valid_data = SumValid + ValidTotalSize, - sum_file_size = SumFileSize + FileSize, - current_file = File }} + sum_file_size = SumFileSize + FileSize, + current_file = File }} end, {0, State}, FileSummaryEts); build_index(false, Files, State) -> {ok, Pid} = gatherer:start_link(), @@ -1333,8 +1333,8 @@ build_index(false, Files, State) -> build_index(Gatherer, Left, [], State = #msstate { file_summary_ets = FileSummaryEts, - sum_valid_data = SumValid, - sum_file_size = SumFileSize }) -> + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> case gatherer:fetch(Gatherer) of finished -> ok = rabbit_misc:unlink_and_capture_exit(Gatherer), @@ -1351,7 +1351,7 @@ build_index(Gatherer, Left, [], build_index(Gatherer, Left, [], State #msstate { sum_valid_data = SumValid + ValidTotalSize, - sum_file_size = SumFileSize + FileSize }) + sum_file_size = SumFileSize + FileSize }) end; build_index(Gatherer, Left, [File|Files], State) -> Child = make_ref(), @@ -1395,10 +1395,14 @@ build_index_worker( [F|_] -> {F, FileSize} end, ok = gatherer:produce(Gatherer, #file_summary { - file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = false, - left = Left, right = Right, file_size = FileSize1, - readers = 0 }), + file = File, + valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + left = Left, + right = Right, + file_size = FileSize1, + locked = false, + readers = 0 }), ok = gatherer:finished(Gatherer, Ref). %%---------------------------------------------------------------------------- @@ -1419,11 +1423,15 @@ maybe_roll_to_new_file( {ok, NextHdl} = open_file( Dir, filenum_to_name(NextFile), ?WRITE_MODE), - true = ets:insert_new( - FileSummaryEts, #file_summary { - file = NextFile, valid_total_size = 0, contiguous_top = 0, - left = CurFile, right = undefined, file_size = 0, - locked = false, readers = 0 }), + true = ets:insert_new(FileSummaryEts, #file_summary { + file = NextFile, + valid_total_size = 0, + contiguous_top = 0, + left = CurFile, + right = undefined, + file_size = 0, + locked = false, + readers = 0 }), true = ets:update_element(FileSummaryEts, CurFile, {#file_summary.right, NextFile}), true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), @@ -1481,11 +1489,13 @@ find_files_to_gc(FileSummaryEts, N, First) -> find_files_to_gc(_FileSummaryEts, _N, #file_summary {}, [], Pairs) -> lists:reverse(Pairs); find_files_to_gc(FileSummaryEts, N, - #file_summary { right = Source, file = Dest, + #file_summary { right = Source, + file = Dest, valid_total_size = DestValid }, - [SourceObj = #file_summary { left = Dest, right = SourceRight, + [SourceObj = #file_summary { left = Dest, + right = SourceRight, valid_total_size = SourceValid, - file = Source }], + file = Source }], Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso not is_atom(SourceRight) -> Pair = {Source, Dest}, @@ -1502,13 +1512,17 @@ find_files_to_gc(FileSummaryEts, N, _Left, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; -delete_file_if_empty(File, State = - #msstate { dir = Dir, sum_file_size = SumFileSize, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> - [#file_summary { valid_total_size = ValidData, file_size = FileSize, - left = Left, right = Right, locked = false }] - = ets:lookup(FileSummaryEts, File), +delete_file_if_empty(File, State = #msstate { + dir = Dir, + sum_file_size = SumFileSize, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + [#file_summary { valid_total_size = ValidData, + left = Left, + right = Right, + file_size = FileSize, + locked = false }] = + ts:lookup(FileSummaryEts, File), case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined @@ -1540,15 +1554,17 @@ delete_file_if_empty(File, State = gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> [SourceObj = #file_summary { - readers = SourceReaders, - valid_total_size = SourceValidData, left = DestFile, - file_size = SourceFileSize, locked = true }] = - ets:lookup(FileSummaryEts, SourceFile), + readers = SourceReaders, + valid_total_size = SourceValidData, + left = DestFile, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, SourceFile), [DestObj = #file_summary { - readers = DestReaders, - valid_total_size = DestValidData, right = SourceFile, - file_size = DestFileSize, locked = true }] = - ets:lookup(FileSummaryEts, DestFile), + readers = DestReaders, + valid_total_size = DestValidData, + right = SourceFile, + file_size = DestFileSize, + locked = true }] = ets:lookup(FileSummaryEts, DestFile), case SourceReaders =:= 0 andalso DestReaders =:= 0 of true -> TotalValidData = DestValidData + SourceValidData, @@ -1565,13 +1581,13 @@ gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> gc(SourceFile, DestFile, State) end. -combine_files(#file_summary { file = Source, +combine_files(#file_summary { file = Source, valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, + left = Destination }, + #file_summary { file = Destination, valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, - right = Source }, + contiguous_top = DestinationContiguousTop, + right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), |
