diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-16 00:13:42 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-16 00:13:42 +0100 |
| commit | c5c7e483e1d12222ba84fc7d7d805de6257cbb6d (patch) | |
| tree | 7230469e84605bb4f65bd016b148fb008c5707c3 | |
| parent | 3afb316eeec2b5354497d949a46e3251e57bfda2 (diff) | |
| download | rabbitmq-server-git-c5c7e483e1d12222ba84fc7d7d805de6257cbb6d.tar.gz | |
Permit 0 ref counts and don't actually forget about messages until the file gets removed
| -rw-r--r-- | src/rabbit_msg_store.erl | 118 |
1 files changed, 78 insertions, 40 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 4600efc124..c55380d332 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -332,9 +332,11 @@ read(Server, Guid, Server, 2, {read, Guid}, infinity), CState} end, case index_lookup(Guid, CState) of - not_found -> Defer(); - MsgLocation -> client_read1(Server, MsgLocation, Defer, - CState) + Result when Result =:= not_found orelse + (Result #msg_location.ref_count =:= 0) -> + Defer(); + MsgLocation -> + client_read1(Server, MsgLocation, Defer, CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -476,7 +478,8 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% badarg scenario above, but we don't have a missing file %% - we just have the /wrong/ file). case index_lookup(Guid, CState) of - #msg_location { file = File } = MsgLocation -> + #msg_location { file = File, ref_count = RefCount } = + MsgLocation when RefCount > 0 -> %% Still the same file. mark_handle_open(FileHandlesEts, File), @@ -619,37 +622,48 @@ handle_cast({write, Guid, Msg}, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + Write = + fun () -> + {ok, CurOffset} = + file_handle_cache:current_virtual_offset(CurHdl), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + ok = index_insert( + #msg_location { + guid = Guid, ref_count = 1, file = CurFile, + offset = CurOffset, total_size = TotalSize }, State), + [#file_summary { right = undefined, + locked = false, + file_size = FileSize } = Summary] = + ets:lookup(FileSummaryEts, CurFile), + ok = add_to_file_summary(Summary, TotalSize, CurOffset, CurFile, + FileSize + TotalSize, State), + NextOffset = CurOffset + TotalSize, + noreply( + maybe_roll_to_new_file( + NextOffset, State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize })) + end, case index_lookup(Guid, State) of not_found -> - %% New message, lots to do - {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), - ok = index_insert(#msg_location { - guid = Guid, ref_count = 1, file = CurFile, - offset = CurOffset, total_size = TotalSize }, - State), - [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - right = undefined, - locked = false, - file_size = FileSize }] = - ets:lookup(FileSummaryEts, CurFile), - ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = case CurOffset =:= ContiguousTop of - true -> ValidTotalSize1; - false -> ContiguousTop - end, - true = ets:update_element( - FileSummaryEts, CurFile, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}, - {#file_summary.file_size, FileSize + TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply( - maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + Write(); + #msg_location { ref_count = 0, file = File, offset = Offset, + total_size = TotalSize } -> + [#file_summary { locked = Locked, + file_size = FileSize } = Summary] = + ets:lookup(FileSummaryEts, File), + case Locked of + true -> + ok = index_delete(Guid, State), + Write(); + false -> + ok = index_update_fields( + Guid, {#msg_location.ref_count, 1}, State), + ok = add_to_file_summary(Summary, TotalSize, Offset, File, + FileSize, State), + noreply(State #msstate { + sum_valid_data = SumValid + TotalSize }) + end; #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC @@ -716,6 +730,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, [{#file_summary.locked, false}, {#file_summary.right, SrcRight}]), true = ets:delete(FileSummaryEts, Src), + ok = index_delete_by_file(Src, State), noreply( maybe_compact(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, @@ -804,10 +819,28 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. +add_to_file_summary(#file_summary { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop }, + TotalSize, Offset, File, FileSize, + #msstate { file_summary_ets = FileSummaryEts }) -> + ValidTotalSize1 = ValidTotalSize + TotalSize, + ContiguousTop1 = case Offset =:= ContiguousTop of + true -> ValidTotalSize1; + false -> ContiguousTop + end, + true = + ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}, + {#file_summary.contiguous_top, ContiguousTop1}, + {#file_summary.file_size, FileSize}]), + ok. + read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> case index_lookup(Guid, State) of - not_found -> + Result when Result =:= not_found orelse + (Result #msg_location.ref_count =:= 0) -> gen_server2:reply(From, not_found), State; MsgLocation -> @@ -880,7 +913,8 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> case index_lookup(Guid, State) of - not_found -> + Result when Result =:= not_found orelse + (Result #msg_location.ref_count =:= 0) -> gen_server2:reply(From, false), State; #msg_location { file = File } -> @@ -914,7 +948,9 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, true -> add_to_pending_gc_completion({remove, Guid}, State); false -> - ok = index_delete(Guid, State), + ok = index_update_fields( + Guid, {#msg_location.ref_count, RefCount - 1}, + State), ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, true = ets:update_element( @@ -927,9 +963,8 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, _ when 1 < RefCount -> ok = decrement_cache(DedupCacheEts, Guid), %% only update field, otherwise bad interaction with concurrent GC - ok = index_update_fields(Guid, - {#msg_location.ref_count, RefCount - 1}, - State), + ok = index_update_fields( + Guid, {#msg_location.ref_count, RefCount - 1}, State), State end. @@ -1501,6 +1536,7 @@ delete_file_if_empty(File, State = #msstate { end, true = mark_handle_to_close(FileHandlesEts, File), true = ets:delete(FileSummaryEts, File), + ok = index_delete_by_file(File, State), State1 = close_handle(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; @@ -1611,7 +1647,9 @@ find_unremoved_messages_in_file(File, lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of #msg_location { file = File, total_size = TotalSize, - offset = Offset } = Entry -> + ref_count = RefCount, + offset = Offset } = Entry + when RefCount > 0 -> {[ Entry | List ], TotalSize + Size}; _ -> Acc |
