summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl19
1 files changed, 17 insertions, 2 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d3720d6712..f895e4a0fd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -535,8 +535,7 @@ terminate(_Reason, State) ->
delete_and_terminate(_Reason, State) ->
%% There is no need to interact with qi at all - which we do as
%% part of 'purge' and 'purge_pending_ack', other than deleting
- %% it. That's why the last parameter to those functions is delete
- %% and terminate.
+ %% it.
State1 = purge_when_no_pending_acks(State),
State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(false, State1, true),
@@ -1315,6 +1314,10 @@ remove(AckRequired, MsgStatus = #msg_status {
%% Helpers for Public API purge/1 function
%%----------------------------------------------------------------------------
+%% The difference between purge_when_pending_acks/1
+%% vs. purge_when_no_pending_acks/1 is that the first one issues a
+%% deliver and an ack for every message that's being removed, while
+%% the later just resets the queue index state.
purge_when_pending_acks(State) ->
AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
State1 = purge1(AfterFun, State),
@@ -1325,6 +1328,18 @@ purge_when_no_pending_acks(State) ->
State1 = purge1(AfterFun, State),
a(reset_qi_state(State1)).
+%% This function removes messages from each of {q1, q2, q3, q4}.
+%%
+%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
+%% are specially handled by purge_betas_and_deltas/2.
+%%
+%% purge_betas_and_deltas/2 loads messages from the queue index,
+%% filling up q3 and in some cases moving messages form q2 to q3 while
+%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The
+%% messages loaded into q3 are removed by calling
+%% remove_queue_entries/3 until there are no more messages to be read
+%% from the queue index. Messags are loaded in batches from the queue
+%% index.
purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
State1 = remove_queue_entries(Q4, AfterFun, State),