diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-04 10:52:31 +0100 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-04 10:52:31 +0100 |
| commit | 5f410cd8ea07b0b893720ae196ec87cecb90d109 (patch) | |
| tree | 51ef37c60e8a5c26e4508047e66ab94e3caa757e /src | |
| parent | 4e40d079efb5f1bb4a0ea856626693c9f4fec596 (diff) | |
| download | rabbitmq-server-git-5f410cd8ea07b0b893720ae196ec87cecb90d109.tar.gz | |
takes into account the delete_and_terminate case
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 145 |
1 files changed, 106 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b07899817d..f37e023bab 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -533,35 +533,74 @@ terminate(_Reason, State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(_Reason, State) -> - %% TODO: there is no need to interact with qi at all - which we do - %% as part of 'purge' and 'purge_pending_ack', other than - %% deleting it. - {_PurgeCount, State1} = purge(State), - State2 = #vqstate { index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack(false, State1), - IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), + %% There is no need to interact with qi at all - which we do as + %% part of 'purge' and 'purge_pending_ack', other than deleting + %% it. That's why the last parameter to those function is delete + %% and terminate. + {_PurgeCount, State1} = purge(State, true), + State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = + purge_pending_ack(false, State1, true), case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) end, rabbit_msg_store:client_delete_and_terminate(MSCStateT), - a(State2 #vqstate { index_state = IndexState1, - msg_store_clients = undefined }). + a(State2 #vqstate { msg_store_clients = undefined }). delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). +%% the default purge/1 case is to purge the queue when we are not part +%% of a delete_and_terminate case. +purge(State) -> + purge(State, false). + purge(State = #vqstate { q4 = Q4, - len = Len }) -> - State1 = remove_queue_entries(Q4, State), + len = Len }, + DeleteAndTerminate) -> + {IndexWiped, State1} = remove_queue_entries(Q4, State, DeleteAndTerminate), + %% purge_betas_and_deltas takes care of wiping q3. State2 = #vqstate { q1 = Q1 } = - purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), - - State3 = remove_queue_entries(Q1, State2), - - {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. + purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }, + DeleteAndTerminate), + + {_, State3 = #vqstate { q2 = Q2 }} = + remove_queue_entries(Q1, State2, DeleteAndTerminate), + + State4 = + case IndexWiped of + true -> + %% If IndexWiped we need to update the queue length + %% and all the related queue stats since succesive + %% calls to remove_queue_entries after the first one + %% are not decreasing stats. + %% + %% Also IndexWiped means that there were no pending + %% acks, so the queue is empty as well, letting us + %% reset len. + %% + %% Also IndexWiped means that the delta can be set to + %% ?BLANK_DELTA since the queue is empty as well. + %% + %% q2 needs to be wiped as well, since if the index + %% was wiped, then maybe_deltas_to_betas is not + %% called, possibly leaving messages inside q2. + {_, S} = remove_queue_entries(Q2, State3, DeleteAndTerminate), + S#vqstate{ len = 0, + ram_msg_count = 0, + persistent_count = 0, + bytes = 0, + unacked_bytes = 0, + ram_bytes = 0, + persistent_bytes = 0, + delta = ?BLANK_DELTA, + q2 = ?QUEUE:new() }; + false -> + State3 + end, + + {Len, a(State4 #vqstate { q1 = ?QUEUE:new() })}. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -751,10 +790,8 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -depth(State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). +depth(State) -> + len(State) + count_pending_acks(State). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -1320,34 +1357,44 @@ remove(AckRequired, MsgStatus = #msg_status { State2 #vqstate {out_counter = OutCount + 1, index_state = IndexState2})}. -purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> +purge_betas_and_deltas(State = #vqstate { q3 = Q3 }, DeleteAndTerminate) -> case ?QUEUE:is_empty(Q3) of true -> State; - false -> State1 = remove_queue_entries(Q3, State), - purge_betas_and_deltas(maybe_deltas_to_betas( - State1#vqstate{q3 = ?QUEUE:new()})) + false -> {WipedIndex, State1} = + remove_queue_entries(Q3, State, DeleteAndTerminate), + State2 = State1#vqstate{q3 = ?QUEUE:new()}, + case WipedIndex of + true -> + %% q3 is already empty, so we can return + %% already. + State2; + false -> + purge_betas_and_deltas( + maybe_deltas_to_betas(State2), DeleteAndTerminate) + end end. +remove_queue_entries(Q, State) -> + remove_queue_entries(Q, State, false). + remove_queue_entries(Q, State = #vqstate{index_state = IndexState, - msg_store_clients = MSCState, - ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - {MsgIdsByStore, Delivers, Acks, State1} = + msg_store_clients = MSCState}, + DeleteAndTerminate) -> + {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - IndexState1 = - case gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA) of - 0 -> - rabbit_queue_index:reset_state(IndexState); + {WipedIndex, IndexState1} = + case is_pending_ack_empty(State1) orelse DeleteAndTerminate of + true -> + {true, rabbit_queue_index:reset_state(IndexState)}; _ -> - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)) + {false, rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState))} end, - State1#vqstate{index_state = IndexState1}. + {WipedIndex, State1#vqstate{index_state = IndexState1}}. remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, @@ -1362,6 +1409,14 @@ remove_queue_entries1( cons_if(IndexOnDisk, SeqId, Acks), stats({-1, 0}, {MsgStatus, none}, State)}. +is_pending_ack_empty(State) -> + count_pending_acks(State) =:= 0. + +count_pending_acks(#vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). + %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1514,12 +1569,16 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, end end. +purge_pending_ack(KeepPersistent, State) -> + purge_pending_ack(KeepPersistent, State, false). + purge_pending_ack(KeepPersistent, State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA, index_state = IndexState, - msg_store_clients = MSCState }) -> + msg_store_clients = MSCState }, + DeleteAndTerminate) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( @@ -1538,7 +1597,14 @@ purge_pending_ack(KeepPersistent, State1 end; false -> IndexState1 = - rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), + case DeleteAndTerminate of + %% in the DeleteAndTerminate case we need to + %% wipe the index. + true -> + rabbit_queue_index:delete_and_terminate(IndexState); + false -> + rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState) + end, [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } @@ -1904,6 +1970,7 @@ maybe_deltas_to_betas(State = #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, ram_bytes = RamBytes + RamBytesInc, disk_read_count = DiskReadCount + RamCountsInc}, + case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being |
