diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-06 12:45:14 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-06 12:45:14 +0200 |
| commit | 99634fcbbe4c5a14196a1ead4c4546cdfa4612df (patch) | |
| tree | 15cc3f400b241976d7a4893394d9e82062833493 | |
| parent | 1d9a92b20494a4c096a36573ab4595626ff741da (diff) | |
| download | rabbitmq-server-git-99634fcbbe4c5a14196a1ead4c4546cdfa4612df.tar.gz | |
refactors purge
| -rw-r--r-- | src/rabbit_variable_queue.erl | 224 |
1 files changed, 103 insertions, 121 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f37e023bab..62d1805a1c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -535,9 +535,9 @@ terminate(_Reason, State) -> delete_and_terminate(_Reason, State) -> %% There is no need to interact with qi at all - which we do as %% part of 'purge' and 'purge_pending_ack', other than deleting - %% it. That's why the last parameter to those function is delete + %% it. That's why the last parameter to those functions is delete %% and terminate. - {_PurgeCount, State1} = purge(State, true), + {_PurgeCount, State1} = purge(State), State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = purge_pending_ack(false, State1, true), case MSCStateP of @@ -550,57 +550,45 @@ delete_and_terminate(_Reason, State) -> delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -%% the default purge/1 case is to purge the queue when we are not part -%% of a delete_and_terminate case. -purge(State) -> - purge(State, false). +purge_when_pending_acks(State) -> + AfterFun = process_delivers_and_acks_fun(deliver_and_ack), + State1 = purge1(AfterFun, State), + a(State1). -purge(State = #vqstate { q4 = Q4, - len = Len }, - DeleteAndTerminate) -> - {IndexWiped, State1} = remove_queue_entries(Q4, State, DeleteAndTerminate), +purge_when_no_pending_acks(State) -> + AfterFun = process_delivers_and_acks_fun(none), + State1 = purge1(AfterFun, State), + a(wipe_index(State1)). + +purge1(AfterFun, State = #vqstate { q4 = Q4}) -> + State1 = remove_queue_entries(Q4, AfterFun, State), - %% purge_betas_and_deltas takes care of wiping q3. State2 = #vqstate { q1 = Q1 } = - purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }, - DeleteAndTerminate), - - {_, State3 = #vqstate { q2 = Q2 }} = - remove_queue_entries(Q1, State2, DeleteAndTerminate), - - State4 = - case IndexWiped of - true -> - %% If IndexWiped we need to update the queue length - %% and all the related queue stats since succesive - %% calls to remove_queue_entries after the first one - %% are not decreasing stats. - %% - %% Also IndexWiped means that there were no pending - %% acks, so the queue is empty as well, letting us - %% reset len. - %% - %% Also IndexWiped means that the delta can be set to - %% ?BLANK_DELTA since the queue is empty as well. - %% - %% q2 needs to be wiped as well, since if the index - %% was wiped, then maybe_deltas_to_betas is not - %% called, possibly leaving messages inside q2. - {_, S} = remove_queue_entries(Q2, State3, DeleteAndTerminate), - S#vqstate{ len = 0, - ram_msg_count = 0, - persistent_count = 0, - bytes = 0, - unacked_bytes = 0, - ram_bytes = 0, - persistent_bytes = 0, - delta = ?BLANK_DELTA, - q2 = ?QUEUE:new() }; - false -> - State3 - end, - - {Len, a(State4 #vqstate { q1 = ?QUEUE:new() })}. + purge_betas_and_deltas(AfterFun, State1 #vqstate { q4 = ?QUEUE:new() }), + + State3 = remove_queue_entries(Q1, AfterFun, State2), + + a(State3 #vqstate { q1 = ?QUEUE:new() }). + +wipe_index(State = #vqstate{index_state = IndexState }) -> + State #vqstate { index_state = + rabbit_queue_index:reset_state(IndexState) }. + +is_pending_ack_empty(State) -> + count_pending_acks(State) =:= 0. + +count_pending_acks(#vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). + +purge(State = #vqstate { len = Len }) -> + case is_pending_ack_empty(State) of + true -> + {Len, purge_when_pending_acks(State)}; + false -> + {Len, purge_when_pending_acks(State)} + end. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -1106,32 +1094,29 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> - {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = - lists:foldr( - fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> - case SeqId < TransientThreshold andalso not IsPersistent of - true -> {Filtered1, - cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB}; - false -> MsgStatus = m(beta_msg_status(M)), - HaveMsg = msg_in_ram(MsgStatus), - Size = msg_size(MsgStatus), - case (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)) of - false -> {?QUEUE:in_r(MsgStatus, Filtered1), - Delivers1, Acks1, - RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size}; - true -> Acc %% [0] - end - end - end, {?QUEUE:new(), [], [], 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA) -> + lists:foldr( + fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, + {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) -> + case SeqId < TransientThreshold andalso not IsPersistent of + true -> {Filtered1, + cons_if(not IsDelivered, SeqId, Delivers1), + [SeqId | Acks1], RRC, RB}; + false -> MsgStatus = m(beta_msg_status(M)), + HaveMsg = msg_in_ram(MsgStatus), + Size = msg_size(MsgStatus), + case (gb_trees:is_defined(SeqId, RPA) orelse + gb_trees:is_defined(SeqId, DPA) orelse + gb_trees:is_defined(SeqId, QPA)) of + false -> {?QUEUE:in_r(MsgStatus, Filtered1), + Delivers1, Acks1, + RRC + one_if(HaveMsg), + RB + one_if(HaveMsg) * Size}; + true -> Acc %% [0] + end + end + end, {?QUEUE:new(), [], [], 0, 0}, List). + %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have %% been stored in the QI, thus the message must have been in @@ -1357,44 +1342,39 @@ remove(AckRequired, MsgStatus = #msg_status { State2 #vqstate {out_counter = OutCount + 1, index_state = IndexState2})}. -purge_betas_and_deltas(State = #vqstate { q3 = Q3 }, DeleteAndTerminate) -> +purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of true -> State; - false -> {WipedIndex, State1} = - remove_queue_entries(Q3, State, DeleteAndTerminate), - State2 = State1#vqstate{q3 = ?QUEUE:new()}, - case WipedIndex of - true -> - %% q3 is already empty, so we can return - %% already. - State2; - false -> - purge_betas_and_deltas( - maybe_deltas_to_betas(State2), DeleteAndTerminate) - end + false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State), + purge_betas_and_deltas(DelsAndAcksFun, + maybe_deltas_to_betas( + DelsAndAcksFun, + State1#vqstate{q3 = ?QUEUE:new()})) + end. + +process_delivers_and_acks_fun(deliver_and_ack) -> + fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> + IndexState1 = qi_deliver_and_ack(Delivers, Acks, IndexState), + State #vqstate { index_state = IndexState1 } + end; +process_delivers_and_acks_fun(_) -> + fun (_, _, State) -> + State end. -remove_queue_entries(Q, State) -> - remove_queue_entries(Q, State, false). +qi_deliver_and_ack(Delivers, Acks, IndexState) -> + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)). -remove_queue_entries(Q, State = #vqstate{index_state = IndexState, - msg_store_clients = MSCState}, - DeleteAndTerminate) -> +remove_queue_entries(Q, DelsAndAcksFun, + State = #vqstate{msg_store_clients = MSCState}) -> {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - {WipedIndex, IndexState1} = - case is_pending_ack_empty(State1) orelse DeleteAndTerminate of - true -> - {true, rabbit_queue_index:reset_state(IndexState)}; - _ -> - {false, rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))} - end, - {WipedIndex, State1#vqstate{index_state = IndexState1}}. + DelsAndAcksFun(Delivers, Acks, State1). remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, @@ -1409,14 +1389,6 @@ remove_queue_entries1( cons_if(IndexOnDisk, SeqId, Acks), stats({-1, 0}, {MsgStatus, none}, State)}. -is_pending_ack_empty(State) -> - count_pending_acks(State) =:= 0. - -count_pending_acks(#vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). - %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1943,7 +1915,13 @@ fetch_from_q3(State = #vqstate { q1 = Q1, maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; -maybe_deltas_to_betas(State = #vqstate { +maybe_deltas_to_betas(State) -> + maybe_deltas_to_betas( + process_delivers_and_acks_fun(deliver_and_ack), + State). + +maybe_deltas_to_betas(DelsAndAcksFun, + State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, @@ -1963,20 +1941,24 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, RamCountsInc, RamBytesInc, IndexState2} = + {Q3a, Delivers, Acks, RamCountsInc, RamBytesInc} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, QPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2, - ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc, - disk_read_count = DiskReadCount + RamCountsInc}, + RPA, DPA, QPA), + State1 = DelsAndAcksFun(Delivers, Acks, + State #vqstate { + index_state = IndexState1 }), + + State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc, + disk_read_count = DiskReadCount + RamCountsInc}, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold maybe_deltas_to_betas( - State1 #vqstate { + DelsAndAcksFun, + State2 #vqstate { delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> Q3b = ?QUEUE:join(Q3, Q3a), @@ -1984,14 +1966,14 @@ maybe_deltas_to_betas(State = #vqstate { 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { q2 = ?QUEUE:new(), + State2 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, q3 = ?QUEUE:join(Q3b, Q2) }; N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }), - State1 #vqstate { delta = Delta1, + State2 #vqstate { delta = Delta1, q3 = Q3b } end end. |
