diff options
Diffstat (limited to 'src/rabbit.erl')
| -rw-r--r-- | src/rabbit.erl | 47 |
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 807e9e7d58..86c53ff665 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,7 +27,7 @@ %%--------------------------------------------------------------------------- %% Boot steps. --export([maybe_insert_default_data/0, boot_delegate/0]). +-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]). -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, @@ -123,15 +123,9 @@ {requires, core_initialized}, {enables, routing_ready}]}). --rabbit_boot_step({exchange_recovery, - [{description, "exchange recovery"}, - {mfa, {rabbit_exchange, recover, []}}, - {requires, empty_db_check}, - {enables, routing_ready}]}). - --rabbit_boot_step({queue_sup_queue_recovery, - [{description, "queue supervisor and queue recovery"}, - {mfa, {rabbit_amqqueue, start, []}}, +-rabbit_boot_step({recovery, + [{description, "exchange / queue recovery"}, + {mfa, {rabbit, recover, []}}, {requires, empty_db_check}, {enables, routing_ready}]}). @@ -186,6 +180,7 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). +-spec(recover/0 :: () -> 'ok'). -endif. @@ -464,6 +459,38 @@ boot_delegate() -> {ok, Count} = application:get_env(rabbit, delegate_count), rabbit_sup:start_child(delegate_sup, [Count]). +recover() -> + Xs = rabbit_exchange:recover(), + Qs = rabbit_amqqueue:start(), + Bs = rabbit_binding:recover(Qs), + {RecXBs, NoRecSrcBs} = filter_recovered_exchanges(Xs, Bs), + ok = recovery_callbacks(RecXBs, NoRecSrcBs). + +filter_recovered_exchanges(Xs, Bs) -> + RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]), + lists:foldl( + fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) -> + case dict:find(Src, RecXs) of + {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs}; + error -> {RecXBs, dict:append(Src, B, NoRecXBs)} + end + end, {dict:new(), dict:new()}, Bs). + +recovery_callbacks(RecXBs, NoRecXBs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + dict:map(fun (X, Bs) -> + rabbit_exchange:callback(X, start, [Tx, X, Bs]) + end, RecXBs), + dict:map(fun (Src, Bs) -> + {ok, X} = rabbit_exchange:lookup(Src), + rabbit_exchange:callback(X, add_bindings, + [Tx, X, Bs]) + end, NoRecXBs) + end), + ok. + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); |
