summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl128
1 files changed, 62 insertions, 66 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2ea6d616c4..fc49b0e77e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -756,30 +756,7 @@ handle_cast({write, CRef, MsgId},
case update_flying(-1, MsgId, CRef, State) of
process ->
[{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(
- should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile,
- State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts,
- {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile,
- State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts,
- {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ noreply(write_message(MsgId, Msg, CRef, State));
ignore ->
%% A 'remove' has already been issued and eliminated the
%% 'write'. If all writes get eliminated,
@@ -789,8 +766,8 @@ handle_cast({write, CRef, MsgId},
%% the message can continue to be done client side, from
%% either the cache or the non-current files. If the
%% message *is* in the current file then the cache entry
- %% will be removed by the normal logic for that,
- %% e.g. above and during file rolling.
+ %% will be removed by the normal logic for that in
+ %% write_message/4 and maybe_roll_to_new_file/2.
case index_lookup(MsgId, State) of
[#msg_location { file = File }]
when File == State #msstate.current_file ->
@@ -973,8 +950,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, MsgId, Msg, State) ->
- write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
+write_message(MsgId, Msg, CRef,
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(MsgId, Msg,
+ record_pending_confirm(CRef, MsgId, State1));
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end.
+
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end.
write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -1072,44 +1106,6 @@ contains_message(MsgId, From,
end
end.
-remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg whilst
- %% it's being GC'd.
- %%
- %% ASSERTION: [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }}
- when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () -> index_update_ref_count(
- MsgId, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from cur_file_cache_ets here because
- %% there may be further writes in the mailbox for the
- %% same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(
- File, -TotalSize, State))
- end;
- _ -> ok = Dec(),
- State
- end
- end.
-
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =