summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl97
1 files changed, 50 insertions, 47 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 96f4bf1e48..3d159850a9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -550,38 +550,6 @@ delete_and_terminate(_Reason, State) ->
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
-purge_when_pending_acks(State) ->
- AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
- State1 = purge1(AfterFun, State),
- a(State1).
-
-purge_when_no_pending_acks(State) ->
- AfterFun = process_delivers_and_acks_fun(none),
- State1 = purge1(AfterFun, State),
- a(wipe_index(State1)).
-
-purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
- State1 = remove_queue_entries(Q4, AfterFun, State),
-
- State2 = #vqstate { q1 = Q1 } =
- purge_betas_and_deltas(AfterFun, State1 #vqstate { q4 = ?QUEUE:new() }),
-
- State3 = remove_queue_entries(Q1, AfterFun, State2),
-
- a(State3 #vqstate { q1 = ?QUEUE:new() }).
-
-wipe_index(State = #vqstate{index_state = IndexState }) ->
- State #vqstate { index_state =
- rabbit_queue_index:reset_state(IndexState) }.
-
-is_pending_ack_empty(State) ->
- count_pending_acks(State) =:= 0.
-
-count_pending_acks(#vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
-
purge(State = #vqstate { len = Len }) ->
case is_pending_ack_empty(State) of
true ->
@@ -1118,7 +1086,6 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun
end
end, {?QUEUE:new(), [], [], 0, 0}, List),
{Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
-
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
%% been stored in the QI, thus the message must have been in
@@ -1344,6 +1311,42 @@ remove(AckRequired, MsgStatus = #msg_status {
State2 #vqstate {out_counter = OutCount + 1,
index_state = IndexState2})}.
+%%----------------------------------------------------------------------------
+%% Helpers for Public API purge/1 function
+%%----------------------------------------------------------------------------
+
+purge_when_pending_acks(State) ->
+ AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
+ State1 = purge1(AfterFun, State),
+ a(State1).
+
+purge_when_no_pending_acks(State) ->
+ AfterFun = process_delivers_and_acks_fun(none),
+ State1 = purge1(AfterFun, State),
+ a(wipe_index(State1)).
+
+purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
+ State1 = remove_queue_entries(Q4, AfterFun, State),
+
+ State2 = #vqstate { q1 = Q1 } =
+ purge_betas_and_deltas(AfterFun, State1 #vqstate { q4 = ?QUEUE:new() }),
+
+ State3 = remove_queue_entries(Q1, AfterFun, State2),
+
+ a(State3 #vqstate { q1 = ?QUEUE:new() }).
+
+wipe_index(State = #vqstate{index_state = IndexState }) ->
+ State #vqstate { index_state =
+ rabbit_queue_index:reset_state(IndexState) }.
+
+is_pending_ack_empty(State) ->
+ count_pending_acks(State) =:= 0.
+
+count_pending_acks(#vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+
purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
true -> State;
@@ -1354,20 +1357,6 @@ purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) ->
State1#vqstate{q3 = ?QUEUE:new()}))
end.
-process_delivers_and_acks_fun(deliver_and_ack) ->
- fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
- IndexState1 = qi_deliver_and_ack(Delivers, Acks, IndexState),
- State #vqstate { index_state = IndexState1 }
- end;
-process_delivers_and_acks_fun(_) ->
- fun (_, _, State) ->
- State
- end.
-
-qi_deliver_and_ack(Delivers, Acks, IndexState) ->
- rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState)).
-
remove_queue_entries(Q, DelsAndAcksFun,
State = #vqstate{msg_store_clients = MSCState}) ->
{MsgIdsByStore, Delivers, Acks, State1} =
@@ -1391,6 +1380,20 @@ remove_queue_entries1(
cons_if(IndexOnDisk, SeqId, Acks),
stats({-1, 0}, {MsgStatus, none}, State)}.
+process_delivers_and_acks_fun(deliver_and_ack) ->
+ fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
+ IndexState1 = qi_deliver_and_ack(Delivers, Acks, IndexState),
+ State #vqstate { index_state = IndexState1 }
+ end;
+process_delivers_and_acks_fun(_) ->
+ fun (_, _, State) ->
+ State
+ end.
+
+qi_deliver_and_ack(Delivers, Acks, IndexState) ->
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)).
+
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------