summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-04 10:52:31 +0100
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-04 10:52:31 +0100
commit5f410cd8ea07b0b893720ae196ec87cecb90d109 (patch)
tree51ef37c60e8a5c26e4508047e66ab94e3caa757e
parent4e40d079efb5f1bb4a0ea856626693c9f4fec596 (diff)
downloadrabbitmq-server-git-5f410cd8ea07b0b893720ae196ec87cecb90d109.tar.gz
takes into account the delete_and_terminate case
-rw-r--r--src/rabbit_variable_queue.erl145
1 files changed, 106 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b07899817d..f37e023bab 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -533,35 +533,74 @@ terminate(_Reason, State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
- %% TODO: there is no need to interact with qi at all - which we do
- %% as part of 'purge' and 'purge_pending_ack', other than
- %% deleting it.
- {_PurgeCount, State1} = purge(State),
- State2 = #vqstate { index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} } =
- purge_pending_ack(false, State1),
- IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ %% There is no need to interact with qi at all - which we do as
+ %% part of 'purge' and 'purge_pending_ack', other than deleting
+ %% it. That's why the last parameter to those function is delete
+ %% and terminate.
+ {_PurgeCount, State1} = purge(State, true),
+ State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pending_ack(false, State1, true),
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
end,
rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- a(State2 #vqstate { index_state = IndexState1,
- msg_store_clients = undefined }).
+ a(State2 #vqstate { msg_store_clients = undefined }).
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
+%% the default purge/1 case is to purge the queue when we are not part
+%% of a delete_and_terminate case.
+purge(State) ->
+ purge(State, false).
+
purge(State = #vqstate { q4 = Q4,
- len = Len }) ->
- State1 = remove_queue_entries(Q4, State),
+ len = Len },
+ DeleteAndTerminate) ->
+ {IndexWiped, State1} = remove_queue_entries(Q4, State, DeleteAndTerminate),
+ %% purge_betas_and_deltas takes care of wiping q3.
State2 = #vqstate { q1 = Q1 } =
- purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }),
-
- State3 = remove_queue_entries(Q1, State2),
-
- {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}.
+ purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() },
+ DeleteAndTerminate),
+
+ {_, State3 = #vqstate { q2 = Q2 }} =
+ remove_queue_entries(Q1, State2, DeleteAndTerminate),
+
+ State4 =
+ case IndexWiped of
+ true ->
+ %% If IndexWiped we need to update the queue length
+ %% and all the related queue stats since succesive
+ %% calls to remove_queue_entries after the first one
+ %% are not decreasing stats.
+ %%
+ %% Also IndexWiped means that there were no pending
+ %% acks, so the queue is empty as well, letting us
+ %% reset len.
+ %%
+ %% Also IndexWiped means that the delta can be set to
+ %% ?BLANK_DELTA since the queue is empty as well.
+ %%
+ %% q2 needs to be wiped as well, since if the index
+ %% was wiped, then maybe_deltas_to_betas is not
+ %% called, possibly leaving messages inside q2.
+ {_, S} = remove_queue_entries(Q2, State3, DeleteAndTerminate),
+ S#vqstate{ len = 0,
+ ram_msg_count = 0,
+ persistent_count = 0,
+ bytes = 0,
+ unacked_bytes = 0,
+ ram_bytes = 0,
+ persistent_bytes = 0,
+ delta = ?BLANK_DELTA,
+ q2 = ?QUEUE:new() };
+ false ->
+ State3
+ end,
+
+ {Len, a(State4 #vqstate { q1 = ?QUEUE:new() })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -751,10 +790,8 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+depth(State) ->
+ len(State) + count_pending_acks(State).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -1320,34 +1357,44 @@ remove(AckRequired, MsgStatus = #msg_status {
State2 #vqstate {out_counter = OutCount + 1,
index_state = IndexState2})}.
-purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
+purge_betas_and_deltas(State = #vqstate { q3 = Q3 }, DeleteAndTerminate) ->
case ?QUEUE:is_empty(Q3) of
true -> State;
- false -> State1 = remove_queue_entries(Q3, State),
- purge_betas_and_deltas(maybe_deltas_to_betas(
- State1#vqstate{q3 = ?QUEUE:new()}))
+ false -> {WipedIndex, State1} =
+ remove_queue_entries(Q3, State, DeleteAndTerminate),
+ State2 = State1#vqstate{q3 = ?QUEUE:new()},
+ case WipedIndex of
+ true ->
+ %% q3 is already empty, so we can return
+ %% already.
+ State2;
+ false ->
+ purge_betas_and_deltas(
+ maybe_deltas_to_betas(State2), DeleteAndTerminate)
+ end
end.
+remove_queue_entries(Q, State) ->
+ remove_queue_entries(Q, State, false).
+
remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
- msg_store_clients = MSCState,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA}) ->
- {MsgIdsByStore, Delivers, Acks, State1} =
+ msg_store_clients = MSCState},
+ DeleteAndTerminate) ->
+ {MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
{orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- IndexState1 =
- case gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA) of
- 0 ->
- rabbit_queue_index:reset_state(IndexState);
+ {WipedIndex, IndexState1} =
+ case is_pending_ack_empty(State1) orelse DeleteAndTerminate of
+ true ->
+ {true, rabbit_queue_index:reset_state(IndexState)};
_ ->
- rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))
+ {false, rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}
end,
- State1#vqstate{index_state = IndexState1}.
+ {WipedIndex, State1#vqstate{index_state = IndexState1}}.
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
@@ -1362,6 +1409,14 @@ remove_queue_entries1(
cons_if(IndexOnDisk, SeqId, Acks),
stats({-1, 0}, {MsgStatus, none}, State)}.
+is_pending_ack_empty(State) ->
+ count_pending_acks(State) =:= 0.
+
+count_pending_acks(#vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1514,12 +1569,16 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
end
end.
+purge_pending_ack(KeepPersistent, State) ->
+ purge_pending_ack(KeepPersistent, State, false).
+
purge_pending_ack(KeepPersistent,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA,
index_state = IndexState,
- msg_store_clients = MSCState }) ->
+ msg_store_clients = MSCState },
+ DeleteAndTerminate) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
@@ -1538,7 +1597,14 @@ purge_pending_ack(KeepPersistent,
State1
end;
false -> IndexState1 =
- rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ case DeleteAndTerminate of
+ %% in the DeleteAndTerminate case we need to
+ %% wipe the index.
+ true ->
+ rabbit_queue_index:delete_and_terminate(IndexState);
+ false ->
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState)
+ end,
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
@@ -1904,6 +1970,7 @@ maybe_deltas_to_betas(State = #vqstate {
ram_msg_count = RamMsgCount + RamCountsInc,
ram_bytes = RamBytes + RamBytesInc,
disk_read_count = DiskReadCount + RamCountsInc},
+
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being