diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 97 |
1 files changed, 50 insertions, 47 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 96f4bf1e48..3d159850a9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -550,38 +550,6 @@ delete_and_terminate(_Reason, State) -> delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -purge_when_pending_acks(State) -> - AfterFun = process_delivers_and_acks_fun(deliver_and_ack), - State1 = purge1(AfterFun, State), - a(State1). - -purge_when_no_pending_acks(State) -> - AfterFun = process_delivers_and_acks_fun(none), - State1 = purge1(AfterFun, State), - a(wipe_index(State1)). - -purge1(AfterFun, State = #vqstate { q4 = Q4}) -> - State1 = remove_queue_entries(Q4, AfterFun, State), - - State2 = #vqstate { q1 = Q1 } = - purge_betas_and_deltas(AfterFun, State1 #vqstate { q4 = ?QUEUE:new() }), - - State3 = remove_queue_entries(Q1, AfterFun, State2), - - a(State3 #vqstate { q1 = ?QUEUE:new() }). - -wipe_index(State = #vqstate{index_state = IndexState }) -> - State #vqstate { index_state = - rabbit_queue_index:reset_state(IndexState) }. - -is_pending_ack_empty(State) -> - count_pending_acks(State) =:= 0. - -count_pending_acks(#vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). - purge(State = #vqstate { len = Len }) -> case is_pending_ack_empty(State) of true -> @@ -1118,7 +1086,6 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun end end, {?QUEUE:new(), [], [], 0, 0}, List), {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. - %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have %% been stored in the QI, thus the message must have been in @@ -1344,6 +1311,42 @@ remove(AckRequired, MsgStatus = #msg_status { State2 #vqstate {out_counter = OutCount + 1, index_state = IndexState2})}. +%%---------------------------------------------------------------------------- +%% Helpers for Public API purge/1 function +%%---------------------------------------------------------------------------- + +purge_when_pending_acks(State) -> + AfterFun = process_delivers_and_acks_fun(deliver_and_ack), + State1 = purge1(AfterFun, State), + a(State1). + +purge_when_no_pending_acks(State) -> + AfterFun = process_delivers_and_acks_fun(none), + State1 = purge1(AfterFun, State), + a(wipe_index(State1)). + +purge1(AfterFun, State = #vqstate { q4 = Q4}) -> + State1 = remove_queue_entries(Q4, AfterFun, State), + + State2 = #vqstate { q1 = Q1 } = + purge_betas_and_deltas(AfterFun, State1 #vqstate { q4 = ?QUEUE:new() }), + + State3 = remove_queue_entries(Q1, AfterFun, State2), + + a(State3 #vqstate { q1 = ?QUEUE:new() }). + +wipe_index(State = #vqstate{index_state = IndexState }) -> + State #vqstate { index_state = + rabbit_queue_index:reset_state(IndexState) }. + +is_pending_ack_empty(State) -> + count_pending_acks(State) =:= 0. + +count_pending_acks(#vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). + purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of true -> State; @@ -1354,20 +1357,6 @@ purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) -> State1#vqstate{q3 = ?QUEUE:new()})) end. -process_delivers_and_acks_fun(deliver_and_ack) -> - fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> - IndexState1 = qi_deliver_and_ack(Delivers, Acks, IndexState), - State #vqstate { index_state = IndexState1 } - end; -process_delivers_and_acks_fun(_) -> - fun (_, _, State) -> - State - end. - -qi_deliver_and_ack(Delivers, Acks, IndexState) -> - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)). - remove_queue_entries(Q, DelsAndAcksFun, State = #vqstate{msg_store_clients = MSCState}) -> {MsgIdsByStore, Delivers, Acks, State1} = @@ -1391,6 +1380,20 @@ remove_queue_entries1( cons_if(IndexOnDisk, SeqId, Acks), stats({-1, 0}, {MsgStatus, none}, State)}. +process_delivers_and_acks_fun(deliver_and_ack) -> + fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> + IndexState1 = qi_deliver_and_ack(Delivers, Acks, IndexState), + State #vqstate { index_state = IndexState1 } + end; +process_delivers_and_acks_fun(_) -> + fun (_, _, State) -> + State + end. + +qi_deliver_and_ack(Delivers, Acks, IndexState) -> + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)). + %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- |
