diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-03 00:55:31 +0100 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-03 01:00:33 +0100 |
| commit | 4e40d079efb5f1bb4a0ea856626693c9f4fec596 (patch) | |
| tree | 63783827b2248a78757cfe8f4dc0324f14555265 | |
| parent | 5b71eb3d3207f30dff02c3b5fd976058eed7fbdc (diff) | |
| download | rabbitmq-server-git-4e40d079efb5f1bb4a0ea856626693c9f4fec596.tar.gz | |
Improves BQ:purge performance when there are no pending acks
Fixes #295
| -rw-r--r-- | src/rabbit_queue_index.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
2 files changed, 39 insertions, 13 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index adec2cf2e5..88bcc9e575 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_index). --export([erase/1, init/3, recover/6, +-export([erase/1, init/3, reset_state/1, recover/6, terminate/2, delete_and_terminate/1, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). @@ -257,10 +257,19 @@ erase(Name) -> #qistate { dir = Dir } = blank_state(Name), - case rabbit_file:is_dir(Dir) of - true -> rabbit_file:recursive_delete([Dir]); - false -> ok - end. + erase_index_dir(Dir). + +%% used during variable queue purge when there are no pending acks +reset_state(#qistate{ dir = Dir, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun, + journal_handle = JournalHdl }) -> + ok = erase_index_dir(Dir), + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun). init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), @@ -431,11 +440,22 @@ all_queue_directory_names(Dir) -> %% startup and shutdown %%---------------------------------------------------------------------------- +erase_index_dir(Dir) -> + case rabbit_file:is_dir(Dir) of + true -> rabbit_file:recursive_delete([Dir]); + false -> ok + end. + blank_state(QueueName) -> blank_state_dir( filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). blank_state_dir(Dir) -> + blank_state_dir_funs(Dir, + fun (_) -> ok end, + fun (_) -> ok end). + +blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -443,8 +463,8 @@ blank_state_dir(Dir) -> journal_handle = undefined, dirty_count = 0, max_journal_entries = MaxJournal, - on_sync = fun (_) -> ok end, - on_sync_msg = fun (_) -> ok end, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun, unconfirmed = gb_sets:new(), unconfirmed_msg = gb_sets:new() }. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff1d15952f..b07899817d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -554,9 +554,6 @@ delete_crashed(#amqqueue{name = QName}) -> purge(State = #vqstate { q4 = Q4, len = Len }) -> - %% TODO: when there are no pending acks, which is a common case, - %% we could simply wipe the qi instead of issuing delivers and - %% acks for all the messages. State1 = remove_queue_entries(Q4, State), State2 = #vqstate { q1 = Q1 } = @@ -1332,15 +1329,24 @@ purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> end. remove_queue_entries(Q, State = #vqstate{index_state = IndexState, - msg_store_clients = MSCState}) -> + msg_store_clients = MSCState, + ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - IndexState1 = rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)), + IndexState1 = + case gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA) of + 0 -> + rabbit_queue_index:reset_state(IndexState); + _ -> + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)) + end, State1#vqstate{index_state = IndexState1}. remove_queue_entries1( |
