summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl67
1 files changed, 32 insertions, 35 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 62d1805a1c..96f4bf1e48 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -585,7 +585,7 @@ count_pending_acks(#vqstate { ram_pending_ack = RPA,
purge(State = #vqstate { len = Len }) ->
case is_pending_ack_empty(State) of
true ->
- {Len, purge_when_pending_acks(State)};
+ {Len, purge_when_no_pending_acks(State)};
false ->
{Len, purge_when_pending_acks(State)}
end.
@@ -1094,28 +1094,30 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA) ->
- lists:foldr(
- fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
- case SeqId < TransientThreshold andalso not IsPersistent of
- true -> {Filtered1,
- cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1], RRC, RB};
- false -> MsgStatus = m(beta_msg_status(M)),
- HaveMsg = msg_in_ram(MsgStatus),
- Size = msg_size(MsgStatus),
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)) of
- false -> {?QUEUE:in_r(MsgStatus, Filtered1),
- Delivers1, Acks1,
- RRC + one_if(HaveMsg),
- RB + one_if(HaveMsg) * Size};
- true -> Acc %% [0]
- end
- end
- end, {?QUEUE:new(), [], [], 0, 0}, List).
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
+ lists:foldr(
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true -> {Filtered1,
+ cons_if(not IsDelivered, SeqId, Delivers1),
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = msg_in_ram(MsgStatus),
+ Size = msg_size(MsgStatus),
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
+ end
+ end, {?QUEUE:new(), [], [], 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
@@ -1913,13 +1915,12 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
{loaded, {MsgStatus, State2}}
end.
-maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
- State;
maybe_deltas_to_betas(State) ->
- maybe_deltas_to_betas(
- process_delivers_and_acks_fun(deliver_and_ack),
- State).
+ AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
+ maybe_deltas_to_betas(AfterFun, State).
+maybe_deltas_to_betas(_DelsAndAcksFun, State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
+ State;
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
q2 = Q2,
@@ -1941,17 +1942,13 @@ maybe_deltas_to_betas(DelsAndAcksFun,
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Q3a, Delivers, Acks, RamCountsInc, RamBytesInc} =
+ {Q3a, RamCountsInc, RamBytesInc, State1} =
betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, QPA),
- State1 = DelsAndAcksFun(Delivers, Acks,
- State #vqstate {
- index_state = IndexState1 }),
-
+ RPA, DPA, QPA, DelsAndAcksFun,
+ State #vqstate { index_state = IndexState1 }),
State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,
disk_read_count = DiskReadCount + RamCountsInc},
-
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being