summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl34
-rw-r--r--src/rabbit_variable_queue.erl18
2 files changed, 39 insertions, 13 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index adec2cf2e5..88bcc9e575 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_index).
--export([erase/1, init/3, recover/6,
+-export([erase/1, init/3, reset_state/1, recover/6,
terminate/2, delete_and_terminate/1,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
@@ -257,10 +257,19 @@
erase(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case rabbit_file:is_dir(Dir) of
- true -> rabbit_file:recursive_delete([Dir]);
- false -> ok
- end.
+ erase_index_dir(Dir).
+
+%% used during variable queue purge when there are no pending acks
+reset_state(#qistate{ dir = Dir,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun,
+ journal_handle = JournalHdl }) ->
+ ok = erase_index_dir(Dir),
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
@@ -431,11 +440,22 @@ all_queue_directory_names(Dir) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
+erase_index_dir(Dir) ->
+ case rabbit_file:is_dir(Dir) of
+ true -> rabbit_file:recursive_delete([Dir]);
+ false -> ok
+ end.
+
blank_state(QueueName) ->
blank_state_dir(
filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
blank_state_dir(Dir) ->
+ blank_state_dir_funs(Dir,
+ fun (_) -> ok end,
+ fun (_) -> ok end).
+
+blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -443,8 +463,8 @@ blank_state_dir(Dir) ->
journal_handle = undefined,
dirty_count = 0,
max_journal_entries = MaxJournal,
- on_sync = fun (_) -> ok end,
- on_sync_msg = fun (_) -> ok end,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun,
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new() }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ff1d15952f..b07899817d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -554,9 +554,6 @@ delete_crashed(#amqqueue{name = QName}) ->
purge(State = #vqstate { q4 = Q4,
len = Len }) ->
- %% TODO: when there are no pending acks, which is a common case,
- %% we could simply wipe the qi instead of issuing delivers and
- %% acks for all the messages.
State1 = remove_queue_entries(Q4, State),
State2 = #vqstate { q1 = Q1 } =
@@ -1332,15 +1329,24 @@ purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
end.
remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
- msg_store_clients = MSCState}) ->
+ msg_store_clients = MSCState,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
{MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
{orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- IndexState1 = rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ IndexState1 =
+ case gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA) of
+ 0 ->
+ rabbit_queue_index:reset_state(IndexState);
+ _ ->
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))
+ end,
State1#vqstate{index_state = IndexState1}.
remove_queue_entries1(