diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-09-09 15:35:18 -0700 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-09-09 15:35:18 -0700 |
| commit | 1393a458941e9b1193c51366b9a9bce9cf088d36 (patch) | |
| tree | 7bdb43ad0b5978194e9c250743d4ff1e245ded87 | |
| parent | 53d96127b4444c52069da2d36470642ceef6a133 (diff) | |
| parent | 8ed0c81e8d259149fc860ef3831fc32e9bd8632d (diff) | |
| download | rabbitmq-server-git-1393a458941e9b1193c51366b9a9bce9cf088d36.tar.gz | |
Merge pull request #305 from rabbitmq/rabbitmq-server-304rabbitmq_v3_5_5_rc2
refactors betas_from_index_entries/7 > betas_from_index_entries/4
| -rw-r--r-- | src/rabbit_variable_queue.erl | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9448a71529..db95b8c844 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1060,7 +1060,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) -> +betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, @@ -1072,9 +1072,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun false -> MsgStatus = m(beta_msg_status(M)), HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)) of + case is_msg_in_pending_acks(SeqId, State) of false -> {?QUEUE:in_r(MsgStatus, Filtered1), Delivers1, Acks1, RRC + one_if(HaveMsg), @@ -1089,6 +1087,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun %% been stored in the QI, thus the message must have been in %% qi_pending_ack, thus it must already have been in RAM. +is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)). + expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) -> d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }); expand_delta(SeqId, #delta { start_seq_id = StartSeqId, @@ -1822,9 +1827,7 @@ next({delta, #delta{start_seq_id = SeqId, next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse - gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of + case is_msg_in_pending_acks(SeqId, State) of false -> Next = {delta, Delta, Rest, State}, {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) @@ -1996,9 +1999,6 @@ maybe_deltas_to_betas(DelsAndAcksFun, index_state = IndexState, ram_msg_count = RamMsgCount, ram_bytes = RamBytes, - ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, disk_read_count = DiskReadCount, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, @@ -2011,7 +2011,7 @@ maybe_deltas_to_betas(DelsAndAcksFun, IndexState), {Q3a, RamCountsInc, RamBytesInc, State1} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, QPA, DelsAndAcksFun, + DelsAndAcksFun, State #vqstate { index_state = IndexState1 }), State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, ram_bytes = RamBytes + RamBytesInc, |
