summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Bakken <lbakken@pivotal.io>2019-08-29 15:53:06 -0700
committerLuke Bakken <luke@bakken.io>2019-09-09 14:40:20 -0700
commit29f9c113d5eb7b7434804d8543897418e191fbb1 (patch)
tree83ab5038cce1e759d6fe50d47fc9e35816d0cd6a
parent283b34df1730a51671faa3d09f4eb29fea716d3b (diff)
downloadrabbitmq-server-git-lrb-experimental-delta.tar.gz
-rw-r--r--src/rabbit_variable_queue.erl55
1 files changed, 30 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ee0f013e0e..eb48fac3c8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1305,7 +1305,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
+betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State0) ->
{Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
@@ -1317,7 +1317,7 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
false -> MsgStatus = m(beta_msg_status(M)),
HaveMsg = msg_in_ram(MsgStatus),
Size = msg_size(MsgStatus),
- case is_msg_in_pending_acks(SeqId, State) of
+ case is_msg_in_pending_acks(SeqId, State0) of
false -> {?QUEUE:in_r(MsgStatus, Filtered1),
Delivers1, Acks1,
RRC + one_if(HaveMsg),
@@ -1328,8 +1328,8 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
end
end
end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List),
- {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State),
- TransientCount, TransientBytes}.
+ State1 = DelsAndAcksFun(Delivers, Acks, State0),
+ {Filtered, RamReadyCount, RamBytes, State1, TransientCount, TransientBytes}.
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
%% been stored in the QI, thus the message must have been in
@@ -2347,7 +2347,7 @@ next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqId}, State}, IndexState) ->
next(istate(delta, State), IndexState);
next({delta, #delta{start_seq_id = SeqId,
- end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ end_seq_id = SeqIdEnd} = Delta0, State}, IndexState) ->
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
@@ -2615,7 +2615,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun,
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
q2 = Q2,
- delta = Delta,
+ delta = Delta0,
q3 = Q3,
index_state = IndexState,
ram_msg_count = RamMsgCount,
@@ -2626,45 +2626,50 @@ maybe_deltas_to_betas(DelsAndAcksFun,
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
transient = Transient,
- end_seq_id = DeltaSeqIdEnd } = Delta,
+ end_seq_id = DeltaSeqIdEnd } = Delta0,
+
+ rabbit_log:debug("@@@@@@@@ maybe_deltas_to_betas Delta0 ~p", [Delta0]),
+
+ Delta1 = d(Delta0),
+
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
+
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
betas_from_index_entries(List, TransientThreshold,
DelsAndAcksFun,
State #vqstate { index_state = IndexState1 }),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
- ram_bytes = RamBytes + RamBytesInc,
- disk_read_count = DiskReadCount + RamCountsInc },
+ State2 = State1#vqstate{ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc},
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
- maybe_deltas_to_betas(
- DelsAndAcksFun,
- State2 #vqstate {
- delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
+ Delta2 = d(Delta1#delta{start_seq_id = DeltaSeqId1}),
+ State3 = State2#vqstate{delta = Delta2},
+ maybe_deltas_to_betas(DelsAndAcksFun, State3);
Q3aLen ->
Q3b = ?QUEUE:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State2 #vqstate { q2 = ?QUEUE:new(),
- delta = ?BLANK_DELTA,
- q3 = ?QUEUE:join(Q3b, Q2),
- delta_transient_bytes = 0};
+ State2#vqstate{q2 = ?QUEUE:new(),
+ delta = ?BLANK_DELTA,
+ q3 = ?QUEUE:join(Q3b, Q2),
+ delta_transient_bytes = 0};
N when N > 0 ->
- Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
- count = N,
- transient = Transient - TransientCount,
- end_seq_id = DeltaSeqIdEnd }),
- State2 #vqstate { delta = Delta1,
- q3 = Q3b,
- delta_transient_bytes = DeltaTransientBytes - TransientBytes }
+ Delta3 = d(#delta{start_seq_id = DeltaSeqId1,
+ count = N,
+ transient = Transient - TransientCount,
+ end_seq_id = DeltaSeqIdEnd}),
+ State2#vqstate{delta = Delta3,
+ q3 = Q3b,
+ delta_transient_bytes = DeltaTransientBytes - TransientBytes}
end
end.