summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-07 18:15:48 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-07 18:15:48 +0200
commit51543033e8aee162151b5fc46215a89284d5ac5d (patch)
tree69169a6fcfd8510e92fd49dacf9edea33a0b17b8
parent8719045fbc51209dc160a08f9d9cb9c73ad3aef5 (diff)
downloadrabbitmq-server-git-51543033e8aee162151b5fc46215a89284d5ac5d.tar.gz
refactors purge_pending_ack/2 to handle delete_and_terminate
-rw-r--r--src/rabbit_variable_queue.erl74
1 files changed, 44 insertions, 30 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5e859819e4..3f0630b82b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -537,8 +537,8 @@ delete_and_terminate(_Reason, State) ->
%% part of 'purge' and 'purge_pending_ack', other than deleting
%% it.
State1 = purge_and_index_reset(State),
- State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
- purge_pending_ack(false, State1, true),
+ State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pa_delete_and_terminate(State1),
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
@@ -1561,16 +1561,30 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
end
end.
-purge_pending_ack(KeepPersistent, State) ->
- purge_pending_ack(KeepPersistent, State, false).
-
purge_pending_ack(KeepPersistent,
- State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
- index_state = IndexState,
- msg_store_clients = MSCState },
- DeleteAndTerminate) ->
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ case KeepPersistent of
+ true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
+ State1;
+ false -> IndexState1 =
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }
+ end.
+
+purge_pa_delete_and_terminate(
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }.
+
+purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
@@ -1580,26 +1594,26 @@ purge_pending_ack(KeepPersistent,
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
qi_pending_ack = gb_trees:empty()},
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
- case KeepPersistent of
- true -> case orddict:find(false, MsgIdsByStore) of
- error -> State1;
- {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
- MsgIds),
- State1
- end;
- false -> IndexState1 =
- case DeleteAndTerminate of
- %% in the DeleteAndTerminate case we need to
- %% wipe the index.
- true ->
- rabbit_queue_index:delete_and_terminate(IndexState);
- false ->
- rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState)
- end,
- [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- State1 #vqstate { index_state = IndexState1 }
+%% MsgIdsByStore is an orddict with two keys:
+%%
+%% true: holds a list of Persistent Message Ids.
+%% false: holds a list of Transient Message Ids.
+%%
+%% When we call orddict:to_list/1 we get two sets of msg ids, where
+%% IsPersistent is either true for persistent messages or false for
+%% transient ones. The msg_store_remove/3 function takes this boolean
+%% flag to determine from which store the messages should be removed
+%% from.
+remove_msgs_by_id(MsgIdsByStore, MSCState) ->
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].
+
+remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
+ case orddict:find(false, MsgIdsByStore) of
+ error -> ok;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
end.
accumulate_ack_init() -> {[], orddict:new(), []}.