summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl20
-rw-r--r--src/rabbit_amqqueue.erl53
2 files changed, 34 insertions, 39 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b120499739..259ac0401a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -91,12 +91,6 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
--rabbit_boot_step({rabbit_amqqueue_sup,
- [{description, "queue supervisor"},
- {mfa, {rabbit_amqqueue, start, []}},
- {requires, kernel_ready},
- {enables, core_initialized}]}).
-
-rabbit_boot_step({rabbit_router,
[{description, "cluster router"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -109,7 +103,6 @@
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_node_monitor]}},
{requires, kernel_ready},
- {requires, rabbit_amqqueue_sup},
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
@@ -125,14 +118,15 @@
{mfa, {rabbit_exchange, recover, []}},
{requires, empty_db_check}]}).
--rabbit_boot_step({queue_recovery,
- [{description, "queue recovery"},
- {mfa, {rabbit_amqqueue, recover, []}},
- {requires, exchange_recovery}]}).
+-rabbit_boot_step({queue_sup_queue_recovery,
+ [{description, "queue supervisor and queue recovery"},
+ {mfa, {rabbit_amqqueue, start, []}},
+ {requires, empty_db_check}]}).
-rabbit_boot_step({persister,
- [{mfa, {rabbit_sup, start_child, [rabbit_persister]}},
- {requires, queue_recovery}]}).
+ [{mfa, {rabbit_sup, start_child,
+ [rabbit_persister]}},
+ {requires, queue_sup_queue_recovery}]}).
-rabbit_boot_step({guid_generator,
[{description, "guid generator"},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3bb4dbcdbe..e13dd9edb5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/4, delete/3, purge/1]).
+-export([start/0, declare/4, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
@@ -63,7 +63,6 @@
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
@@ -118,45 +117,47 @@
%%----------------------------------------------------------------------------
start() ->
+ DurableQueues = find_durable_queues(),
{ok,_} = supervisor:start_child(
rabbit_sup,
{rabbit_amqqueue_sup,
{rabbit_amqqueue_sup, start_link, []},
transient, infinity, supervisor, [rabbit_amqqueue_sup]}),
+ _RealDurableQueues = recover_durable_queues(DurableQueues),
ok.
-recover() ->
- ok = recover_durable_queues(),
- ok.
-
-recover_durable_queues() ->
+find_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end).
+
+recover_durable_queues(DurableQueues) ->
+ lists:foldl(
+ fun (RecoveredQ, Acc) ->
Q = start_queue_process(RecoveredQ),
%% We need to catch the case where a client connected to
%% another node has deleted the queue (and possibly
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
+ fun () ->
+ case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ,
+ read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
+ true -> [Q | Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
+ end, [], DurableQueues).
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,