diff options
| author | Tim Watson <tim@rabbitmq.com> | 2014-01-30 14:22:38 +0000 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2014-01-30 14:22:38 +0000 |
| commit | eec669407e0c7ce029068d544291e3304b540d3f (patch) | |
| tree | 3fbfa23dd5116b38465fcf3c3289d9b17c4dfae5 | |
| parent | b481bb397c875bca942c2aae5dfa339599f306bc (diff) | |
| download | rabbitmq-server-git-eec669407e0c7ce029068d544291e3304b540d3f.tar.gz | |
Perform (and wait for) queue recovery in parallel
| -rw-r--r-- | src/rabbit_amqqueue.erl | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eeb0e0bf03..15928bf39d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -232,10 +232,21 @@ find_durable_queues() -> end). recover_durable_queues(QueuesAndRecoveryTerms) -> - Qs = [{start_queue_process(node(), Q), Terms} || - {Q, Terms} <- QueuesAndRecoveryTerms], - [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs, - gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}]. + {QsInit, QueuesDict} = + lists:foldl( + fun({Q, Terms}, {Qs, Dict}) -> + Q1 = #amqqueue{ pid = Pid } = start_queue_process(node(), Q), + {[{Pid, Terms}|Qs], dict:store(Pid, Q1, Dict)} + end, {[], dict:new()}, QueuesAndRecoveryTerms), + + {Results, Failures} = + gen_server2:mcall([{Pid, {init, {self(), Terms}}} || + {Pid, Terms} <- QsInit]), + + %% TODO: get the queue name somehow, without doing *yet another* traversal.. + [rabbit_log:error("Queue ~p failed to initialise: ~p~n", + [Pid, Error]) || {Pid, Error} <- Failures], + [dict:fetch(Pid, QueuesDict) || {Pid, {new, _Q}} <- Results]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), |
