diff options
author | Luke Bakken <lbakken@pivotal.io> | 2019-08-29 15:53:06 -0700 |
---|---|---|
committer | Luke Bakken <luke@bakken.io> | 2019-09-09 14:40:20 -0700 |
commit | 29f9c113d5eb7b7434804d8543897418e191fbb1 (patch) | |
tree | 83ab5038cce1e759d6fe50d47fc9e35816d0cd6a | |
parent | 283b34df1730a51671faa3d09f4eb29fea716d3b (diff) | |
download | rabbitmq-server-git-lrb-experimental-delta.tar.gz |
debugginglrb-experimental-delta
-rw-r--r-- | src/rabbit_variable_queue.erl | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ee0f013e0e..eb48fac3c8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1305,7 +1305,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> +betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State0) -> {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, @@ -1317,7 +1317,7 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> false -> MsgStatus = m(beta_msg_status(M)), HaveMsg = msg_in_ram(MsgStatus), Size = msg_size(MsgStatus), - case is_msg_in_pending_acks(SeqId, State) of + case is_msg_in_pending_acks(SeqId, State0) of false -> {?QUEUE:in_r(MsgStatus, Filtered1), Delivers1, Acks1, RRC + one_if(HaveMsg), @@ -1328,8 +1328,8 @@ betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> end end end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State), - TransientCount, TransientBytes}. + State1 = DelsAndAcksFun(Delivers, Acks, State0), + {Filtered, RamReadyCount, RamBytes, State1, TransientCount, TransientBytes}. %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have %% been stored in the QI, thus the message must have been in @@ -2347,7 +2347,7 @@ next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}, State}, IndexState) -> next(istate(delta, State), IndexState); next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + end_seq_id = SeqIdEnd} = Delta0, State}, IndexState) -> SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), SeqId1 = lists:min([SeqIdB, SeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), @@ -2615,7 +2615,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun, maybe_deltas_to_betas(DelsAndAcksFun, State = #vqstate { q2 = Q2, - delta = Delta, + delta = Delta0, q3 = Q3, index_state = IndexState, ram_msg_count = RamMsgCount, @@ -2626,45 +2626,50 @@ maybe_deltas_to_betas(DelsAndAcksFun, #delta { start_seq_id = DeltaSeqId, count = DeltaCount, transient = Transient, - end_seq_id = DeltaSeqIdEnd } = Delta, + end_seq_id = DeltaSeqIdEnd } = Delta0, + + rabbit_log:debug("@@@@@@@@ maybe_deltas_to_betas Delta0 ~p", [Delta0]), + + Delta1 = d(Delta0), + DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State #vqstate { index_state = IndexState1 }), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc, - disk_read_count = DiskReadCount + RamCountsInc }, + State2 = State1#vqstate{ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc, + disk_read_count = DiskReadCount + RamCountsInc}, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold - maybe_deltas_to_betas( - DelsAndAcksFun, - State2 #vqstate { - delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); + Delta2 = d(Delta1#delta{start_seq_id = DeltaSeqId1}), + State3 = State2#vqstate{delta = Delta2}, + maybe_deltas_to_betas(DelsAndAcksFun, State3); Q3aLen -> Q3b = ?QUEUE:join(Q3, Q3a), case DeltaCount - Q3aLen of 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State2 #vqstate { q2 = ?QUEUE:new(), - delta = ?BLANK_DELTA, - q3 = ?QUEUE:join(Q3b, Q2), - delta_transient_bytes = 0}; + State2#vqstate{q2 = ?QUEUE:new(), + delta = ?BLANK_DELTA, + q3 = ?QUEUE:join(Q3b, Q2), + delta_transient_bytes = 0}; N when N > 0 -> - Delta1 = d(#delta { start_seq_id = DeltaSeqId1, - count = N, - transient = Transient - TransientCount, - end_seq_id = DeltaSeqIdEnd }), - State2 #vqstate { delta = Delta1, - q3 = Q3b, - delta_transient_bytes = DeltaTransientBytes - TransientBytes } + Delta3 = d(#delta{start_seq_id = DeltaSeqId1, + count = N, + transient = Transient - TransientCount, + end_seq_id = DeltaSeqIdEnd}), + State2#vqstate{delta = Delta3, + q3 = Q3b, + delta_transient_bytes = DeltaTransientBytes - TransientBytes} end end. |