summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-09-09 15:35:18 -0700
committerMichael Klishin <michael@novemberain.com>2015-09-09 15:35:18 -0700
commit1393a458941e9b1193c51366b9a9bce9cf088d36 (patch)
tree7bdb43ad0b5978194e9c250743d4ff1e245ded87
parent53d96127b4444c52069da2d36470642ceef6a133 (diff)
parent8ed0c81e8d259149fc860ef3831fc32e9bd8632d (diff)
downloadrabbitmq-server-git-1393a458941e9b1193c51366b9a9bce9cf088d36.tar.gz
Merge pull request #305 from rabbitmq/rabbitmq-server-304rabbitmq_v3_5_5_rc2
refactors betas_from_index_entries/7 > betas_from_index_entries/4
-rw-r--r--src/rabbit_variable_queue.erl22
1 files changed, 11 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9448a71529..db95b8c844 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1060,7 +1060,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) ->
+betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
{Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
@@ -1072,9 +1072,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
false -> MsgStatus = m(beta_msg_status(M)),
HaveMsg = msg_in_ram(MsgStatus),
Size = msg_size(MsgStatus),
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)) of
+ case is_msg_in_pending_acks(SeqId, State) of
false -> {?QUEUE:in_r(MsgStatus, Filtered1),
Delivers1, Acks1,
RRC + one_if(HaveMsg),
@@ -1089,6 +1087,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
%% been stored in the QI, thus the message must have been in
%% qi_pending_ack, thus it must already have been in RAM.
+is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)).
+
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
@@ -1822,9 +1827,7 @@ next({delta, #delta{start_seq_id = SeqId,
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
- case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
+ case is_msg_in_pending_acks(SeqId, State) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -1996,9 +1999,6 @@ maybe_deltas_to_betas(DelsAndAcksFun,
index_state = IndexState,
ram_msg_count = RamMsgCount,
ram_bytes = RamBytes,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
disk_read_count = DiskReadCount,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
@@ -2011,7 +2011,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1} =
betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, QPA, DelsAndAcksFun,
+ DelsAndAcksFun,
State #vqstate { index_state = IndexState1 }),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,