diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-09 07:02:41 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-09-09 07:02:41 +0100 |
| commit | 51575c57bee967400a061676b49e6aee6841c386 (patch) | |
| tree | 97b4e184bc2230408b20846e61e4ec664a187402 | |
| parent | 6647baf5521f35cdde481c09e542ba0a7f51ffad (diff) | |
| download | rabbitmq-server-git-51575c57bee967400a061676b49e6aee6841c386.tar.gz | |
cosmetic tweaks and minor simplifications
| -rw-r--r-- | src/rabbit_msg_store.erl | 67 |
1 files changed, 31 insertions, 36 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 6f3f70e11e..b01c3b1a4a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -326,10 +326,9 @@ read(Server, Guid, Server, 2, {read, Guid}, infinity), CState} end, case index_lookup_positive_refcount(Guid, CState) of - not_found -> - Defer(); - MsgLocation -> - client_read1(Server, MsgLocation, Defer, CState) + not_found -> Defer(); + MsgLocation -> client_read1(Server, MsgLocation, Defer, + CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -618,16 +617,14 @@ handle_cast({write, Guid}, file_size = FileSize } = Summary] = ets:lookup(FileSummaryEts, File), case Locked of - true -> - ok = index_delete(Guid, State), - write_message(Guid, Msg, 1, State); - false -> - ok = index_update_fields( - Guid, {#msg_location.ref_count, 1}, State), - ok = add_to_file_summary(Summary, TotalSize, File, FileSize, - State), - noreply(State #msstate { - sum_valid_data = SumValid + TotalSize }) + true -> ok = index_delete(Guid, State), + write_message(Guid, Msg, 1, State); + false -> ok = index_update_fields( + Guid, {#msg_location.ref_count, 1}, State), + ok = add_to_file_summary(Summary, TotalSize, FileSize, + State), + noreply(State #msstate { + sum_valid_data = SumValid + TotalSize }) end; #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only @@ -803,16 +800,16 @@ write_message(Guid, Msg, RefCount, locked = false, file_size = FileSize } = Summary] = ets:lookup(FileSummaryEts, CurFile), - ok = add_to_file_summary(Summary, TotalSize, CurFile, FileSize + TotalSize, - State), + ok = add_to_file_summary(Summary, TotalSize, FileSize + TotalSize, State), NextOffset = CurOffset + TotalSize, noreply(maybe_roll_to_new_file( NextOffset, State #msstate { sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize })). -add_to_file_summary(#file_summary { valid_total_size = ValidTotalSize }, - TotalSize, File, FileSize, +add_to_file_summary(#file_summary { file = File, + valid_total_size = ValidTotalSize }, + TotalSize, FileSize, #msstate { file_summary_ets = FileSummaryEts }) -> ValidTotalSize1 = ValidTotalSize + TotalSize, true = ets:update_element( @@ -916,6 +913,10 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, total_size = TotalSize } = index_lookup(Guid, State), + %% only update field, otherwise bad interaction with concurrent GC + Dec = fun () -> index_update_fields( + Guid, {#msg_location.ref_count, RefCount - 1}, State) + end, case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because @@ -926,25 +927,20 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of - true -> - add_to_pending_gc_completion({remove, Guid}, State); - false -> - ok = index_update_fields( - Guid, {#msg_location.ref_count, RefCount - 1}, - State), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = - ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}]), - State1 = delete_file_if_empty(File, State), - State1 #msstate { sum_valid_data = SumValid - TotalSize } + true -> add_to_pending_gc_completion({remove, Guid}, State); + false -> ok = Dec(), + true = ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, + ValidTotalSize - TotalSize}]), + delete_file_if_empty( + File, + State #msstate { + sum_valid_data = SumValid - TotalSize }) end; _ when 1 < RefCount -> ok = decrement_cache(DedupCacheEts, Guid), - %% only update field, otherwise bad interaction with concurrent GC - ok = index_update_fields( - Guid, {#msg_location.ref_count, RefCount - 1}, State), + ok = Dec(), State end. @@ -1618,8 +1614,7 @@ find_unremoved_messages_in_file(File, lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> case Index:lookup(Guid, IndexState) of #msg_location { file = File, total_size = TotalSize, - ref_count = 0, - offset = Offset } -> + offset = Offset, ref_count = 0 } -> ok = Index:delete(Guid, IndexState), Acc; #msg_location { file = File, total_size = TotalSize, |
