summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-10 19:37:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-10 19:37:00 +0100
commit02349e945480aff4e6c9a0e0a9a0c6a9153d7900 (patch)
tree1dd519ec4ef9c92fbd058da386ce9ddb23a771e1
parent3adc1ab43489a23b5ef00fc1c82fe34c7d8d096d (diff)
downloadrabbitmq-server-git-02349e945480aff4e6c9a0e0a9a0c6a9153d7900.tar.gz
refactoring: extract core of 'write' handler
similar to what we already did for 'remove' Also, don't add the 'dying client' marker message to the pending confirms. While that does no harm, it is clearly not right.
-rw-r--r--src/rabbit_msg_store.erl128
1 files changed, 62 insertions, 66 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2ea6d616c4..fc49b0e77e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -756,30 +756,7 @@ handle_cast({write, CRef, MsgId},
case update_flying(-1, MsgId, CRef, State) of
process ->
[{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(
- should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile,
- State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts,
- {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile,
- State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts,
- {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ noreply(write_message(MsgId, Msg, CRef, State));
ignore ->
%% A 'remove' has already been issued and eliminated the
%% 'write'. If all writes get eliminated,
@@ -789,8 +766,8 @@ handle_cast({write, CRef, MsgId},
%% the message can continue to be done client side, from
%% either the cache or the non-current files. If the
%% message *is* in the current file then the cache entry
- %% will be removed by the normal logic for that,
- %% e.g. above and during file rolling.
+ %% will be removed by the normal logic for that in
+ %% write_message/4 and maybe_roll_to_new_file/2.
case index_lookup(MsgId, State) of
[#msg_location { file = File }]
when File == State #msstate.current_file ->
@@ -973,8 +950,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, MsgId, Msg, State) ->
- write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
+write_message(MsgId, Msg, CRef,
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(MsgId, Msg,
+ record_pending_confirm(CRef, MsgId, State1));
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end.
+
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end.
write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -1072,44 +1106,6 @@ contains_message(MsgId, From,
end
end.
-remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg whilst
- %% it's being GC'd.
- %%
- %% ASSERTION: [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }}
- when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () -> index_update_ref_count(
- MsgId, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from cur_file_cache_ets here because
- %% there may be further writes in the mailbox for the
- %% same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(
- File, -TotalSize, State))
- end;
- _ -> ok = Dec(),
- State
- end
- end.
-
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =