diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-09 19:55:48 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-09 19:55:48 +0200 |
| commit | a1ee6baacd270f7666320841b38f4302cf11abcf (patch) | |
| tree | d727bfb80568c0f8bd04e02a6004ca6ea78410b6 /src | |
| parent | 990bc9a25d2cbd190a47364b9b4f48bd3074c591 (diff) | |
| download | rabbitmq-server-git-a1ee6baacd270f7666320841b38f4302cf11abcf.tar.gz | |
refactors betas_from_index_entries/7 > betas_from_index_entries/4
Fixes #304
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 765e9c4d9f..bf4250598e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1061,7 +1061,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) -> +betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, @@ -1073,9 +1073,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun false -> MsgStatus = m(beta_msg_status(M)), HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)) of + case is_msg_in_pending_acks(SeqId, State) of false -> {?QUEUE:in_r(MsgStatus, Filtered1), Delivers1, Acks1, RRC + one_if(HaveMsg), @@ -1090,6 +1088,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun %% been stored in the QI, thus the message must have been in %% qi_pending_ack, thus it must already have been in RAM. +is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, + disk_write_count = DPA, + qi_pending_ack = QPA }) -> + (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)). + expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); expand_delta(SeqId, #delta { start_seq_id = StartSeqId, @@ -1825,9 +1830,7 @@ next({delta, #delta{start_seq_id = SeqId, next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of + case is_msg_in_pending_acks(SeqId, State) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -1999,9 +2002,6 @@ maybe_deltas_to_betas(DelsAndAcksFun, index_state = IndexState, ram_msg_count = RamMsgCount, ram_bytes = RamBytes, - ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, disk_read_count = DiskReadCount, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, @@ -2014,7 +2014,7 @@ maybe_deltas_to_betas(DelsAndAcksFun, IndexState), {Q3a, RamCountsInc, RamBytesInc, State1} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, QPA, DelsAndAcksFun, + DelsAndAcksFun, State #vqstate { index_state = IndexState1 }), State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, ram_bytes = RamBytes + RamBytesInc, |
