summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2014-01-30 14:22:38 +0000
committerTim Watson <tim@rabbitmq.com>2014-01-30 14:22:38 +0000
commiteec669407e0c7ce029068d544291e3304b540d3f (patch)
tree3fbfa23dd5116b38465fcf3c3289d9b17c4dfae5
parentb481bb397c875bca942c2aae5dfa339599f306bc (diff)
downloadrabbitmq-server-git-eec669407e0c7ce029068d544291e3304b540d3f.tar.gz
Perform (and wait for) queue recovery in parallel
-rw-r--r--src/rabbit_amqqueue.erl19
1 files changed, 15 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eeb0e0bf03..15928bf39d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -232,10 +232,21 @@ find_durable_queues() ->
end).
recover_durable_queues(QueuesAndRecoveryTerms) ->
- Qs = [{start_queue_process(node(), Q), Terms} ||
- {Q, Terms} <- QueuesAndRecoveryTerms],
- [Q || {Q = #amqqueue{ pid = Pid }, Terms} <- Qs,
- gen_server2:call(Pid, {init, {self(), Terms}}, infinity) == {new, Q}].
+ {QsInit, QueuesDict} =
+ lists:foldl(
+ fun({Q, Terms}, {Qs, Dict}) ->
+ Q1 = #amqqueue{ pid = Pid } = start_queue_process(node(), Q),
+ {[{Pid, Terms}|Qs], dict:store(Pid, Q1, Dict)}
+ end, {[], dict:new()}, QueuesAndRecoveryTerms),
+
+ {Results, Failures} =
+ gen_server2:mcall([{Pid, {init, {self(), Terms}}} ||
+ {Pid, Terms} <- QsInit]),
+
+ %% TODO: get the queue name somehow, without doing *yet another* traversal..
+ [rabbit_log:error("Queue ~p failed to initialise: ~p~n",
+ [Pid, Error]) || {Pid, Error} <- Failures],
+ [dict:fetch(Pid, QueuesDict) || {Pid, {new, _Q}} <- Results].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),