summaryrefslogtreecommitdiff
path: root/src/rabbit.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-31 16:04:41 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-31 16:04:41 +0100
commit3b311e9cbb3c4f6d3031ab6ee5c6757c18d3d1cf (patch)
tree3742d0a17f6a1ad607ecaf93dd9d066920a557b4 /src/rabbit.erl
parentb0277b0d617c5d6050f0a3e6aae3c271be9c3c58 (diff)
downloadrabbitmq-server-git-3b311e9cbb3c4f6d3031ab6ee5c6757c18d3d1cf.tar.gz
Unify recovery into one boot step, based binding recovery on the queues that have been recovered.
Diffstat (limited to 'src/rabbit.erl')
-rw-r--r--src/rabbit.erl47
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 807e9e7d58..86c53ff665 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0, boot_delegate/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -123,15 +123,9 @@
{requires, core_initialized},
{enables, routing_ready}]}).
--rabbit_boot_step({exchange_recovery,
- [{description, "exchange recovery"},
- {mfa, {rabbit_exchange, recover, []}},
- {requires, empty_db_check},
- {enables, routing_ready}]}).
-
--rabbit_boot_step({queue_sup_queue_recovery,
- [{description, "queue supervisor and queue recovery"},
- {mfa, {rabbit_amqqueue, start, []}},
+-rabbit_boot_step({recovery,
+ [{description, "exchange / queue recovery"},
+ {mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
@@ -186,6 +180,7 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
+-spec(recover/0 :: () -> 'ok').
-endif.
@@ -464,6 +459,38 @@ boot_delegate() ->
{ok, Count} = application:get_env(rabbit, delegate_count),
rabbit_sup:start_child(delegate_sup, [Count]).
+recover() ->
+ Xs = rabbit_exchange:recover(),
+ Qs = rabbit_amqqueue:start(),
+ Bs = rabbit_binding:recover(Qs),
+ {RecXBs, NoRecSrcBs} = filter_recovered_exchanges(Xs, Bs),
+ ok = recovery_callbacks(RecXBs, NoRecSrcBs).
+
+filter_recovered_exchanges(Xs, Bs) ->
+ RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]),
+ lists:foldl(
+ fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) ->
+ case dict:find(Src, RecXs) of
+ {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs};
+ error -> {RecXBs, dict:append(Src, B, NoRecXBs)}
+ end
+ end, {dict:new(), dict:new()}, Bs).
+
+recovery_callbacks(RecXBs, NoRecXBs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ dict:map(fun (X, Bs) ->
+ rabbit_exchange:callback(X, start, [Tx, X, Bs])
+ end, RecXBs),
+ dict:map(fun (Src, Bs) ->
+ {ok, X} = rabbit_exchange:lookup(Src),
+ rabbit_exchange:callback(X, add_bindings,
+ [Tx, X, Bs])
+ end, NoRecXBs)
+ end),
+ ok.
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();