summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-06 12:45:14 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-06 12:45:14 +0200
commit99634fcbbe4c5a14196a1ead4c4546cdfa4612df (patch)
tree15cc3f400b241976d7a4893394d9e82062833493
parent1d9a92b20494a4c096a36573ab4595626ff741da (diff)
downloadrabbitmq-server-git-99634fcbbe4c5a14196a1ead4c4546cdfa4612df.tar.gz
refactors purge
-rw-r--r--src/rabbit_variable_queue.erl224
1 files changed, 103 insertions, 121 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f37e023bab..62d1805a1c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -535,9 +535,9 @@ terminate(_Reason, State) ->
delete_and_terminate(_Reason, State) ->
%% There is no need to interact with qi at all - which we do as
%% part of 'purge' and 'purge_pending_ack', other than deleting
- %% it. That's why the last parameter to those function is delete
+ %% it. That's why the last parameter to those functions is delete
%% and terminate.
- {_PurgeCount, State1} = purge(State, true),
+ {_PurgeCount, State1} = purge(State),
State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(false, State1, true),
case MSCStateP of
@@ -550,57 +550,45 @@ delete_and_terminate(_Reason, State) ->
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
-%% the default purge/1 case is to purge the queue when we are not part
-%% of a delete_and_terminate case.
-purge(State) ->
- purge(State, false).
+purge_when_pending_acks(State) ->
+ AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
+ State1 = purge1(AfterFun, State),
+ a(State1).
-purge(State = #vqstate { q4 = Q4,
- len = Len },
- DeleteAndTerminate) ->
- {IndexWiped, State1} = remove_queue_entries(Q4, State, DeleteAndTerminate),
+purge_when_no_pending_acks(State) ->
+ AfterFun = process_delivers_and_acks_fun(none),
+ State1 = purge1(AfterFun, State),
+ a(wipe_index(State1)).
+
+purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
+ State1 = remove_queue_entries(Q4, AfterFun, State),
- %% purge_betas_and_deltas takes care of wiping q3.
State2 = #vqstate { q1 = Q1 } =
- purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() },
- DeleteAndTerminate),
-
- {_, State3 = #vqstate { q2 = Q2 }} =
- remove_queue_entries(Q1, State2, DeleteAndTerminate),
-
- State4 =
- case IndexWiped of
- true ->
- %% If IndexWiped we need to update the queue length
- %% and all the related queue stats since succesive
- %% calls to remove_queue_entries after the first one
- %% are not decreasing stats.
- %%
- %% Also IndexWiped means that there were no pending
- %% acks, so the queue is empty as well, letting us
- %% reset len.
- %%
- %% Also IndexWiped means that the delta can be set to
- %% ?BLANK_DELTA since the queue is empty as well.
- %%
- %% q2 needs to be wiped as well, since if the index
- %% was wiped, then maybe_deltas_to_betas is not
- %% called, possibly leaving messages inside q2.
- {_, S} = remove_queue_entries(Q2, State3, DeleteAndTerminate),
- S#vqstate{ len = 0,
- ram_msg_count = 0,
- persistent_count = 0,
- bytes = 0,
- unacked_bytes = 0,
- ram_bytes = 0,
- persistent_bytes = 0,
- delta = ?BLANK_DELTA,
- q2 = ?QUEUE:new() };
- false ->
- State3
- end,
-
- {Len, a(State4 #vqstate { q1 = ?QUEUE:new() })}.
+ purge_betas_and_deltas(AfterFun, State1 #vqstate { q4 = ?QUEUE:new() }),
+
+ State3 = remove_queue_entries(Q1, AfterFun, State2),
+
+ a(State3 #vqstate { q1 = ?QUEUE:new() }).
+
+wipe_index(State = #vqstate{index_state = IndexState }) ->
+ State #vqstate { index_state =
+ rabbit_queue_index:reset_state(IndexState) }.
+
+is_pending_ack_empty(State) ->
+ count_pending_acks(State) =:= 0.
+
+count_pending_acks(#vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+
+purge(State = #vqstate { len = Len }) ->
+ case is_pending_ack_empty(State) of
+ true ->
+ {Len, purge_when_pending_acks(State)};
+ false ->
+ {Len, purge_when_pending_acks(State)}
+ end.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -1106,32 +1094,29 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
- {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
- lists:foldr(
- fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
- case SeqId < TransientThreshold andalso not IsPersistent of
- true -> {Filtered1,
- cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1], RRC, RB};
- false -> MsgStatus = m(beta_msg_status(M)),
- HaveMsg = msg_in_ram(MsgStatus),
- Size = msg_size(MsgStatus),
- case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)) of
- false -> {?QUEUE:in_r(MsgStatus, Filtered1),
- Delivers1, Acks1,
- RRC + one_if(HaveMsg),
- RB + one_if(HaveMsg) * Size};
- true -> Acc %% [0]
- end
- end
- end, {?QUEUE:new(), [], [], 0, 0}, List),
- {Filtered, RamReadyCount, RamBytes,
- rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA) ->
+ lists:foldr(
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true -> {Filtered1,
+ cons_if(not IsDelivered, SeqId, Delivers1),
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = msg_in_ram(MsgStatus),
+ Size = msg_size(MsgStatus),
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
+ end
+ end, {?QUEUE:new(), [], [], 0, 0}, List).
+
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
%% been stored in the QI, thus the message must have been in
@@ -1357,44 +1342,39 @@ remove(AckRequired, MsgStatus = #msg_status {
State2 #vqstate {out_counter = OutCount + 1,
index_state = IndexState2})}.
-purge_betas_and_deltas(State = #vqstate { q3 = Q3 }, DeleteAndTerminate) ->
+purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
true -> State;
- false -> {WipedIndex, State1} =
- remove_queue_entries(Q3, State, DeleteAndTerminate),
- State2 = State1#vqstate{q3 = ?QUEUE:new()},
- case WipedIndex of
- true ->
- %% q3 is already empty, so we can return
- %% already.
- State2;
- false ->
- purge_betas_and_deltas(
- maybe_deltas_to_betas(State2), DeleteAndTerminate)
- end
+ false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State),
+ purge_betas_and_deltas(DelsAndAcksFun,
+ maybe_deltas_to_betas(
+ DelsAndAcksFun,
+ State1#vqstate{q3 = ?QUEUE:new()}))
+ end.
+
+process_delivers_and_acks_fun(deliver_and_ack) ->
+ fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
+ IndexState1 = qi_deliver_and_ack(Delivers, Acks, IndexState),
+ State #vqstate { index_state = IndexState1 }
+ end;
+process_delivers_and_acks_fun(_) ->
+ fun (_, _, State) ->
+ State
end.
-remove_queue_entries(Q, State) ->
- remove_queue_entries(Q, State, false).
+qi_deliver_and_ack(Delivers, Acks, IndexState) ->
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)).
-remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
- msg_store_clients = MSCState},
- DeleteAndTerminate) ->
+remove_queue_entries(Q, DelsAndAcksFun,
+ State = #vqstate{msg_store_clients = MSCState}) ->
{MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
{orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {WipedIndex, IndexState1} =
- case is_pending_ack_empty(State1) orelse DeleteAndTerminate of
- true ->
- {true, rabbit_queue_index:reset_state(IndexState)};
- _ ->
- {false, rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}
- end,
- {WipedIndex, State1#vqstate{index_state = IndexState1}}.
+ DelsAndAcksFun(Delivers, Acks, State1).
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
@@ -1409,14 +1389,6 @@ remove_queue_entries1(
cons_if(IndexOnDisk, SeqId, Acks),
stats({-1, 0}, {MsgStatus, none}, State)}.
-is_pending_ack_empty(State) ->
- count_pending_acks(State) =:= 0.
-
-count_pending_acks(#vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
-
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1943,7 +1915,13 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
-maybe_deltas_to_betas(State = #vqstate {
+maybe_deltas_to_betas(State) ->
+ maybe_deltas_to_betas(
+ process_delivers_and_acks_fun(deliver_and_ack),
+ State).
+
+maybe_deltas_to_betas(DelsAndAcksFun,
+ State = #vqstate {
q2 = Q2,
delta = Delta,
q3 = Q3,
@@ -1963,20 +1941,24 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
+ {Q3a, Delivers, Acks, RamCountsInc, RamBytesInc} =
betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, QPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2,
- ram_msg_count = RamMsgCount + RamCountsInc,
- ram_bytes = RamBytes + RamBytesInc,
- disk_read_count = DiskReadCount + RamCountsInc},
+ RPA, DPA, QPA),
+ State1 = DelsAndAcksFun(Delivers, Acks,
+ State #vqstate {
+ index_state = IndexState1 }),
+
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc},
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
maybe_deltas_to_betas(
- State1 #vqstate {
+ DelsAndAcksFun,
+ State2 #vqstate {
delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
Q3aLen ->
Q3b = ?QUEUE:join(Q3, Q3a),
@@ -1984,14 +1966,14 @@ maybe_deltas_to_betas(State = #vqstate {
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { q2 = ?QUEUE:new(),
+ State2 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
q3 = ?QUEUE:join(Q3b, Q2) };
N when N > 0 ->
Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
count = N,
end_seq_id = DeltaSeqIdEnd }),
- State1 #vqstate { delta = Delta1,
+ State2 #vqstate { delta = Delta1,
q3 = Q3b }
end
end.