summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-02 03:14:35 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-02 03:14:35 +0100
commitdba92b6916160e849863ab7d7459e783dea81d4c (patch)
tree3228b5d22ab2d99a30de1b243411f45ee83903a6
parent755f382800d70c33e1f96750d00b98ae9bae3526 (diff)
downloadrabbitmq-server-git-dba92b6916160e849863ab7d7459e783dea81d4c.tar.gz
Next step on making startup faster is to allow the scanning of the queue indicies to occur in parallel (per queue). Also, queue creation can take some substantial time due to queue_index:init. Therefore, stagger the startup of a queue so that this potentially expensive step (a) doesn't get done at all if the queue already exists etc (b) doesn't block amqqueue_process:init from returning. Thus on startup now not only do we do the seeding of the msg_store in parallel (per queue), but the durable queues that come up can also do the bulk of their work in parallel, thus speeding recovery substantially.
-rw-r--r--src/rabbit_amqqueue.erl50
-rw-r--r--src/rabbit_amqqueue_process.erl49
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_msg_store.erl7
-rw-r--r--src/rabbit_queue_index.erl51
-rw-r--r--src/rabbit_tests.erl1
6 files changed, 130 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f0540c93a2..b6e92e0698 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -128,6 +128,7 @@
%%----------------------------------------------------------------------------
start() ->
+ ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
fun (ok) -> finished end, ok]),
@@ -152,26 +153,32 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- lists:foldl(
- fun (RecoveredQ, Acc) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> [Q|Acc];
- false -> exit(Q#amqqueue.pid, shutdown),
- Acc
- end
- end, [], DurableQueues).
+ Qs = lists:foldl(
+ fun (RecoveredQ, Acc) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client
+ %% connected to another node has deleted the queue
+ %% (and possibly re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ,
+ read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true ->
+ ok = gen_server2:cast(Q#amqqueue.pid,
+ init_variable_queue),
+ [Q|Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
+ end
+ end, [], DurableQueues),
+ [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ Qs.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -202,7 +209,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
end) of
not_found -> exit(Q#amqqueue.pid, shutdown),
rabbit_misc:not_found(QueueName);
- Q -> Q;
+ Q -> ok = gen_server2:cast(Q#amqqueue.pid, init_variable_queue),
+ Q;
ExistingQ -> exit(Q#amqqueue.pid, shutdown),
ExistingQ
end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 41435c08c9..e6c8d238f8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -105,19 +105,18 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
-init(Q = #amqqueue { name = QName }) ->
+init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
- VQS = rabbit_variable_queue:init(QName),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- variable_queue_state = VQS,
+ variable_queue_state = undefined,
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -127,25 +126,37 @@ init(Q = #amqqueue { name = QName }) ->
terminate(shutdown, #q{variable_queue_state = VQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
- _VQS = rabbit_variable_queue:terminate(VQS);
+ case VQS of
+ undefined -> ok;
+ _ -> rabbit_variable_queue:terminate(VQS)
+ end;
terminate({shutdown, _}, #q{variable_queue_state = VQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
- _VQS = rabbit_variable_queue:terminate(VQS);
+ case VQS of
+ undefined -> ok;
+ _ -> rabbit_variable_queue:terminate(VQS)
+ end;
terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed.
%% TODO: wait for all in flight tx_commits to complete
- VQS1 = rabbit_variable_queue:tx_rollback(
- lists:concat([PM || #tx { pending_messages = PM } <-
- all_tx_record()]), VQS),
- %% Delete from disk first. If we crash at this point, when a
- %% durable queue, we will be recreated at startup, possibly with
- %% partial content. The alternative is much worse however - if we
- %% called internal_delete first, we would then have a race between
- %% the disk delete and a new queue with the same name being
- %% created and published to.
- _VQS = rabbit_variable_queue:delete_and_terminate(VQS1),
+ case VQS of
+ undefined ->
+ ok;
+ _ ->
+ VQS1 = rabbit_variable_queue:tx_rollback(
+ lists:concat([PM || #tx { pending_messages = PM } <-
+ all_tx_record()]), VQS),
+ %% Delete from disk first. If we crash at this point, when
+ %% a durable queue, we will be recreated at startup,
+ %% possibly with partial content. The alternative is much
+ %% worse however - if we called internal_delete first, we
+ %% would then have a race between the disk delete and a
+ %% new queue with the same name being created and
+ %% published to.
+ rabbit_variable_queue:delete_and_terminate(VQS1)
+ end,
ok = rabbit_amqqueue:internal_delete(qname(State)).
code_change(_OldVsn, State, _Extra) ->
@@ -610,6 +621,9 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(sync, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -815,6 +829,11 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
+handle_cast(init_variable_queue, #q{variable_queue_state = undefined,
+ q = #amqqueue{name = QName}} = State) ->
+ noreply(
+ State #q { variable_queue_state = rabbit_variable_queue:init(QName) });
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81cecb38f3..3bc35ca2be 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -59,6 +59,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([recursive_delete/1]).
-import(mnesia).
-import(lists).
@@ -133,6 +134,7 @@
-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}).
-endif.
@@ -601,3 +603,25 @@ version_compare(A, B) ->
ANum < BNum -> lt;
ANum > BNum -> gt
end.
+
+recursive_delete(Path) ->
+ case filelib:is_dir(Path) of
+ false ->
+ case file:delete(Path) of
+ ok -> ok;
+ %% Path doesn't exist anyway
+ {error, enoent} -> ok
+ end;
+ true ->
+ case file:list_dir(Path) of
+ {ok, FileNames} ->
+ lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete(filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames);
+ {error, Error} ->
+ {error, {Path, Error}}
+ end
+ end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a33b1a3470..5610b35e34 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/1, client_terminate/1]).
+ sync/3, client_init/1, client_terminate/1, clean/2]).
-export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal
@@ -113,6 +113,7 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/1 :: (server()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(clean/2 :: (atom(), file_path()) -> 'ok').
-endif.
@@ -340,6 +341,10 @@ client_terminate(CState) ->
close_all_handles(CState),
ok.
+clean(Server, BaseDir) ->
+ Dir = filename:join(BaseDir, atom_to_list(Server)),
+ ok = rabbit_misc:recursive_delete(Dir).
+
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c0c1b40bd5..935f275451 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -37,6 +37,8 @@
find_lowest_seq_id_seg_and_next_seq_id/1,
start_persistent_msg_store/1]).
+-export([queue_index_walker_reader/3]). %% for internal use only
+
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
@@ -419,30 +421,49 @@ start_persistent_msg_store(DurableQueues) ->
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
-queue_index_walker([]) ->
- finished;
-queue_index_walker([QueueName|QueueNames]) ->
+queue_index_walker(DurableQueues) when is_list(DurableQueues) ->
+ queue_index_walker({DurableQueues, sets:new()});
+
+queue_index_walker({[], Kids}) ->
+ case sets:size(Kids) of
+ 0 -> finished;
+ _ -> receive
+ {found, MsgId, Count} ->
+ {MsgId, Count, {[], Kids}};
+ {finished, Child} ->
+ queue_index_walker({[], sets:del_element(Child, Kids)})
+ end
+ end;
+queue_index_walker({[QueueName | QueueNames], Kids}) ->
+ Child = make_ref(),
+ ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader,
+ [QueueName, self(), Child]}),
+ queue_index_walker({QueueNames, sets:add_element(Child, Kids)}).
+
+queue_index_walker_reader(QueueName, Parent, Guid) ->
State = blank_state(QueueName),
State1 = load_journal(State),
SegNums = all_segment_nums(State1),
- queue_index_walker({SegNums, State1, QueueNames});
+ queue_index_walker_reader(Parent, Guid, State1, SegNums).
-queue_index_walker({[], State, QueueNames}) ->
+queue_index_walker_reader(Parent, Guid, State, []) ->
_State = terminate(false, State),
- queue_index_walker(QueueNames);
-queue_index_walker({[Seg | SegNums], State, QueueNames}) ->
+ Parent ! {finished, Guid};
+queue_index_walker_reader(Parent, Guid, State, [Seg | SegNums]) ->
SeqId = reconstruct_seq_id(Seg, 0),
{Messages, State1} = read_segment_entries(SeqId, State),
- queue_index_walker({Messages, State1, SegNums, QueueNames});
+ queue_index_walker_reader(Parent, Guid, SegNums, State1, Messages).
-queue_index_walker({[], State, SegNums, QueueNames}) ->
- queue_index_walker({SegNums, State, QueueNames});
-queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs],
- State, SegNums, QueueNames}) ->
+queue_index_walker_reader(Parent, Guid, SegNums, State, []) ->
+ queue_index_walker_reader(Parent, Guid, State, SegNums);
+queue_index_walker_reader(
+ Parent, Guid, SegNums, State,
+ [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
case IsPersistent of
- true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
- false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
- end.
+ true -> Parent ! {found, MsgId, 1};
+ false -> ok
+ end,
+ queue_index_walker_reader(Parent, Guid, SegNums, State, Msgs).
%%----------------------------------------------------------------------------
%% Minors
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3ccb83b6f4..474afbcafb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1001,6 +1001,7 @@ start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) ->
start_transient_msg_store().
start_transient_msg_store() ->
+ ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
fun (ok) -> finished end, ok]).