diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-07 12:59:39 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-07 12:59:39 +0200 |
| commit | 6e6651fb1190729e3c5dda5dff21812ed1c95f58 (patch) | |
| tree | 464957c8fa0317bd841b4a14b94002e842935250 /src | |
| parent | 99634fcbbe4c5a14196a1ead4c4546cdfa4612df (diff) | |
| download | rabbitmq-server-git-6e6651fb1190729e3c5dda5dff21812ed1c95f58.tar.gz | |
uses index purge callback in betas_from_index_entries
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 62d1805a1c..96f4bf1e48 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -585,7 +585,7 @@ count_pending_acks(#vqstate { ram_pending_ack = RPA, purge(State = #vqstate { len = Len }) -> case is_pending_ack_empty(State) of true -> - {Len, purge_when_pending_acks(State)}; + {Len, purge_when_no_pending_acks(State)}; false -> {Len, purge_when_pending_acks(State)} end. @@ -1094,28 +1094,30 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA) -> - lists:foldr( - fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> - case SeqId < TransientThreshold andalso not IsPersistent of - true -> {Filtered1, - cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB}; - false -> MsgStatus = m(beta_msg_status(M)), - HaveMsg = msg_in_ram(MsgStatus), - Size = msg_size(MsgStatus), - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)) of - false -> {?QUEUE:in_r(MsgStatus, Filtered1), - Delivers1, Acks1, - RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size}; - true -> Acc %% [0] - end - end - end, {?QUEUE:new(), [], [], 0, 0}, List). +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) -> + {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = + lists:foldr( + fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, + {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> + case SeqId < TransientThreshold andalso not IsPersistent of + true -> {Filtered1, + cons_if(not IsDelivered, SeqId, Delivers1), + [SeqId | Acks1], RRC, RB}; + false -> MsgStatus = m(beta_msg_status(M)), + HaveMsg = msg_in_ram(MsgStatus), + Size = msg_size(MsgStatus), + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)) of + false -> {?QUEUE:in_r(MsgStatus, Filtered1), + Delivers1, Acks1, + RRC + one_if(HaveMsg), + RB + one_if(HaveMsg) * Size}; + true -> Acc %% [0] + end + end + end, {?QUEUE:new(), [], [], 0, 0}, List), + {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have @@ -1913,13 +1915,12 @@ fetch_from_q3(State = #vqstate { q1 = Q1, {loaded, {MsgStatus, State2}} end. -maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> - State; maybe_deltas_to_betas(State) -> - maybe_deltas_to_betas( - process_delivers_and_acks_fun(deliver_and_ack), - State). + AfterFun = process_delivers_and_acks_fun(deliver_and_ack), + maybe_deltas_to_betas(AfterFun, State). +maybe_deltas_to_betas(_DelsAndAcksFun, State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> + State; maybe_deltas_to_betas(DelsAndAcksFun, State = #vqstate { q2 = Q2, @@ -1941,17 +1942,13 @@ maybe_deltas_to_betas(DelsAndAcksFun, DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, Delivers, Acks, RamCountsInc, RamBytesInc} = + {Q3a, RamCountsInc, RamBytesInc, State1} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, QPA), - State1 = DelsAndAcksFun(Delivers, Acks, - State #vqstate { - index_state = IndexState1 }), - + RPA, DPA, QPA, DelsAndAcksFun, + State #vqstate { index_state = IndexState1 }), State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, ram_bytes = RamBytes + RamBytesInc, disk_read_count = DiskReadCount + RamCountsInc}, - case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being |
