diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-07 18:15:48 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-07 18:15:48 +0200 |
| commit | 51543033e8aee162151b5fc46215a89284d5ac5d (patch) | |
| tree | 69169a6fcfd8510e92fd49dacf9edea33a0b17b8 | |
| parent | 8719045fbc51209dc160a08f9d9cb9c73ad3aef5 (diff) | |
| download | rabbitmq-server-git-51543033e8aee162151b5fc46215a89284d5ac5d.tar.gz | |
refactors purge_pending_ack/2 to handle delete_and_terminate
| -rw-r--r-- | src/rabbit_variable_queue.erl | 74 |
1 files changed, 44 insertions, 30 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5e859819e4..3f0630b82b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -537,8 +537,8 @@ delete_and_terminate(_Reason, State) -> %% part of 'purge' and 'purge_pending_ack', other than deleting %% it. State1 = purge_and_index_reset(State), - State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack(false, State1, true), + State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = + purge_pa_delete_and_terminate(State1), case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) @@ -1561,16 +1561,30 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, end end. -purge_pending_ack(KeepPersistent, State) -> - purge_pending_ack(KeepPersistent, State, false). - purge_pending_ack(KeepPersistent, - State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, - index_state = IndexState, - msg_store_clients = MSCState }, - DeleteAndTerminate) -> + State = #vqstate { index_state = IndexState, + msg_store_clients = MSCState }) -> + {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), + case KeepPersistent of + true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), + State1; + false -> IndexState1 = + rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), + remove_msgs_by_id(MsgIdsByStore, MSCState), + State1 #vqstate { index_state = IndexState1 } + end. + +purge_pa_delete_and_terminate( + State = #vqstate { index_state = IndexState, + msg_store_clients = MSCState }) -> + {_, MsgIdsByStore, State1} = purge_pending_ack1(State), + IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), + remove_msgs_by_id(MsgIdsByStore, MSCState), + State1 #vqstate { index_state = IndexState1 }. + +purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( @@ -1580,26 +1594,26 @@ purge_pending_ack(KeepPersistent, State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), disk_pending_ack = gb_trees:empty(), qi_pending_ack = gb_trees:empty()}, + {IndexOnDiskSeqIds, MsgIdsByStore, State1}. - case KeepPersistent of - true -> case orddict:find(false, MsgIdsByStore) of - error -> State1; - {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, - MsgIds), - State1 - end; - false -> IndexState1 = - case DeleteAndTerminate of - %% in the DeleteAndTerminate case we need to - %% wipe the index. - true -> - rabbit_queue_index:delete_and_terminate(IndexState); - false -> - rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState) - end, - [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - State1 #vqstate { index_state = IndexState1 } +%% MsgIdsByStore is an orddict with two keys: +%% +%% true: holds a list of Persistent Message Ids. +%% false: holds a list of Transient Message Ids. +%% +%% When we call orddict:to_list/1 we get two sets of msg ids, where +%% IsPersistent is either true for persistent messages or false for +%% transient ones. The msg_store_remove/3 function takes this boolean +%% flag to determine from which store the messages should be removed +%% from. +remove_msgs_by_id(MsgIdsByStore, MSCState) -> + [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. + +remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> + case orddict:find(false, MsgIdsByStore) of + error -> ok; + {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) end. accumulate_ack_init() -> {[], orddict:new(), []}. |
