summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl67
1 files changed, 31 insertions, 36 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6f3f70e11e..b01c3b1a4a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -326,10 +326,9 @@ read(Server, Guid,
Server, 2, {read, Guid}, infinity),
CState} end,
case index_lookup_positive_refcount(Guid, CState) of
- not_found ->
- Defer();
- MsgLocation ->
- client_read1(Server, MsgLocation, Defer, CState)
+ not_found -> Defer();
+ MsgLocation -> client_read1(Server, MsgLocation, Defer,
+ CState)
end;
[{Guid, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
@@ -618,16 +617,14 @@ handle_cast({write, Guid},
file_size = FileSize } = Summary] =
ets:lookup(FileSummaryEts, File),
case Locked of
- true ->
- ok = index_delete(Guid, State),
- write_message(Guid, Msg, 1, State);
- false ->
- ok = index_update_fields(
- Guid, {#msg_location.ref_count, 1}, State),
- ok = add_to_file_summary(Summary, TotalSize, File, FileSize,
- State),
- noreply(State #msstate {
- sum_valid_data = SumValid + TotalSize })
+ true -> ok = index_delete(Guid, State),
+ write_message(Guid, Msg, 1, State);
+ false -> ok = index_update_fields(
+ Guid, {#msg_location.ref_count, 1}, State),
+ ok = add_to_file_summary(Summary, TotalSize, FileSize,
+ State),
+ noreply(State #msstate {
+ sum_valid_data = SumValid + TotalSize })
end;
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
@@ -803,16 +800,16 @@ write_message(Guid, Msg, RefCount,
locked = false,
file_size = FileSize } = Summary] =
ets:lookup(FileSummaryEts, CurFile),
- ok = add_to_file_summary(Summary, TotalSize, CurFile, FileSize + TotalSize,
- State),
+ ok = add_to_file_summary(Summary, TotalSize, FileSize + TotalSize, State),
NextOffset = CurOffset + TotalSize,
noreply(maybe_roll_to_new_file(
NextOffset, State #msstate {
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize })).
-add_to_file_summary(#file_summary { valid_total_size = ValidTotalSize },
- TotalSize, File, FileSize,
+add_to_file_summary(#file_summary { file = File,
+ valid_total_size = ValidTotalSize },
+ TotalSize, FileSize,
#msstate { file_summary_ets = FileSummaryEts }) ->
ValidTotalSize1 = ValidTotalSize + TotalSize,
true = ets:update_element(
@@ -916,6 +913,10 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
total_size = TotalSize } = index_lookup(Guid, State),
+ %% only update field, otherwise bad interaction with concurrent GC
+ Dec = fun () -> index_update_fields(
+ Guid, {#msg_location.ref_count, RefCount - 1}, State)
+ end,
case RefCount of
1 ->
%% don't remove from CUR_FILE_CACHE_ETS_NAME here because
@@ -926,25 +927,20 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
- true ->
- add_to_pending_gc_completion({remove, Guid}, State);
- false ->
- ok = index_update_fields(
- Guid, {#msg_location.ref_count, RefCount - 1},
- State),
- ValidTotalSize1 = ValidTotalSize - TotalSize,
- true =
- ets:update_element(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, ValidTotalSize1}]),
- State1 = delete_file_if_empty(File, State),
- State1 #msstate { sum_valid_data = SumValid - TotalSize }
+ true -> add_to_pending_gc_completion({remove, Guid}, State);
+ false -> ok = Dec(),
+ true = ets:update_element(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size,
+ ValidTotalSize - TotalSize}]),
+ delete_file_if_empty(
+ File,
+ State #msstate {
+ sum_valid_data = SumValid - TotalSize })
end;
_ when 1 < RefCount ->
ok = decrement_cache(DedupCacheEts, Guid),
- %% only update field, otherwise bad interaction with concurrent GC
- ok = index_update_fields(
- Guid, {#msg_location.ref_count, RefCount - 1}, State),
+ ok = Dec(),
State
end.
@@ -1618,8 +1614,7 @@ find_unremoved_messages_in_file(File,
lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
#msg_location { file = File, total_size = TotalSize,
- ref_count = 0,
- offset = Offset } ->
+ offset = Offset, ref_count = 0 } ->
ok = Index:delete(Guid, IndexState),
Acc;
#msg_location { file = File, total_size = TotalSize,