summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-09 19:55:48 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-09 19:55:48 +0200
commita1ee6baacd270f7666320841b38f4302cf11abcf (patch)
treed727bfb80568c0f8bd04e02a6004ca6ea78410b6 /src
parent990bc9a25d2cbd190a47364b9b4f48bd3074c591 (diff)
downloadrabbitmq-server-git-a1ee6baacd270f7666320841b38f4302cf11abcf.tar.gz
refactors betas_from_index_entries/7 > betas_from_index_entries/4
Fixes #304
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl22
1 files changed, 11 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 765e9c4d9f..bf4250598e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1061,7 +1061,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) ->
+betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
{Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
@@ -1073,9 +1073,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
false -> MsgStatus = m(beta_msg_status(M)),
HaveMsg = msg_in_ram(MsgStatus),
Size = msg_size(MsgStatus),
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)) of
+ case is_msg_in_pending_acks(SeqId, State) of
false -> {?QUEUE:in_r(MsgStatus, Filtered1),
Delivers1, Acks1,
RRC + one_if(HaveMsg),
@@ -1090,6 +1088,13 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
%% been stored in the QI, thus the message must have been in
%% qi_pending_ack, thus it must already have been in RAM.
+is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_write_count = DPA,
+ qi_pending_ack = QPA }) ->
+ (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)).
+
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
@@ -1825,9 +1830,7 @@ next({delta, #delta{start_seq_id = SeqId,
next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
- case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
+ case is_msg_in_pending_acks(SeqId, State) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -1999,9 +2002,6 @@ maybe_deltas_to_betas(DelsAndAcksFun,
index_state = IndexState,
ram_msg_count = RamMsgCount,
ram_bytes = RamBytes,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
disk_read_count = DiskReadCount,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
@@ -2014,7 +2014,7 @@ maybe_deltas_to_betas(DelsAndAcksFun,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1} =
betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, QPA, DelsAndAcksFun,
+ DelsAndAcksFun,
State #vqstate { index_state = IndexState1 }),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,