diff options
| -rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 9 |
10 files changed, 67 insertions, 82 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 8163b6f2c8..fd3ddf7e11 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -20,8 +20,7 @@ -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(start/3 :: (boolean(), rabbit_types:exchange(), - [rabbit_types:binding()]) -> 'ok'). +-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). -spec(delete/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(add_bindings/3 :: (boolean(), rabbit_types:exchange(), diff --git a/src/rabbit.erl b/src/rabbit.erl index fe392c5f4f..2840a5b747 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -124,7 +124,7 @@ {enables, routing_ready}]}). -rabbit_boot_step({recovery, - [{description, "exchange / queue recovery"}, + [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, {requires, empty_db_check}, {enables, routing_ready}]}). @@ -461,35 +461,8 @@ boot_delegate() -> recover() -> XNames = rabbit_exchange:recover(), - QNames = rabbit_amqqueue:start(), - Bs = rabbit_binding:recover(XNames, QNames), - {RecXBs, NoRecXBs} = filter_recovered_exchanges(XNames, Bs), - ok = recovery_callbacks(RecXBs, NoRecXBs). - -filter_recovered_exchanges(Xs, Bs) -> - RecXs = sets:from_list(Xs), - lists:foldl( - fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) -> - case sets:is_element(Src, RecXs) of - true -> {dict:append(Src, B, RecXBs), NoRecXBs}; - false -> {RecXBs, dict:append(Src, B, NoRecXBs)} - end - end, {dict:new(), dict:new()}, Bs). - -recovery_callbacks(RecXBs, NoRecXBs) -> - CB = fun (Tx, F, XBs) -> - dict:map(fun (XName, Bs) -> - {ok, X} = rabbit_exchange:lookup(XName), - rabbit_exchange:callback(X, F, [Tx, X, Bs]) - end, XBs) - end, - rabbit_misc:execute_mnesia_transaction( - fun () -> ok end, - fun (ok, Tx) -> - CB(Tx, start, RecXBs), - CB(Tx, add_bindings, NoRecXBs) - end), - ok. + QNames = rabbit_amqqueue:recover(), + rabbit_binding:recover(XNames, QNames). maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6267b823f2..34ed88bc12 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -16,7 +16,8 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). +-export([recover/0, stop/0, declare/5, delete_immediately/1, delete/3, + purge/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -57,7 +58,7 @@ -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). --spec(start/0 :: () -> [rabbit_types:amqqueue()]). +-spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), @@ -157,7 +158,7 @@ %%---------------------------------------------------------------------------- -start() -> +recover() -> DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), @@ -186,8 +187,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q#amqqueue.name || Q <- Qs, - gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == {new, Q}]. + [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, + gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 4779392084..5ac9c8718b 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -50,8 +50,8 @@ -opaque(deletions() :: dict()). --spec(recover/2 :: ([rabbit_types:exchange()], [rabbit_types:amqqueue()]) -> - [rabbit_types:binding()]). +-spec(recover/2 :: ([rabbit_types:resource()], [rabbit_types:resource()]) -> + 'ok'). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> add_res()). -spec(remove/1 :: (rabbit_types:binding()) -> remove_res()). @@ -94,27 +94,38 @@ destination_name, destination_kind, routing_key, arguments]). -recover(XsL, QsL) -> - Xs = sets:from_list(XsL), - Qs = sets:from_list(QsL), - rabbit_misc:table_fold( - fun (Route = #route{binding = B}, Acc) -> - case should_recover(B, Xs, Qs) of - true -> {_, Rev} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, Route, write), - ok = mnesia:write(rabbit_reverse_route, Rev, write), - [B | Acc]; - false -> Acc - end - end, [], rabbit_durable_route). +recover(XNames, QNames) -> + XNameSet = sets:from_list(XNames), + QNameSet = sets:from_list(QNames), + XBs = rabbit_misc:table_fold( + fun (Route = #route{binding = B = #binding{source = Src}}, Acc) -> + case should_recover(B, XNameSet, QNameSet) of + true -> {_, Rev} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, Route, write), + ok = mnesia:write(rabbit_reverse_route, Rev, + write), + rabbit_misc:dict_cons(Src, B, Acc); + false -> Acc + end + end, dict:new(), rabbit_durable_route), + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + dict:map(fun (XName, Bindings) -> + {ok, X} = rabbit_exchange:lookup(XName), + rabbit_exchange:callback(X, add_bindings, + [Tx, X, Bindings]) + end, XBs) + end), + ok. -should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }}, - XNames, QNames) -> +should_recover(B = #binding{destination = Dst = #resource{ kind = Kind }}, + XNameSet, QNameSet) -> case mnesia:read({rabbit_route, B}) of - [] -> sets:is_element(Dest, case Kind of - exchange -> XNames; - queue -> QNames - end); + [] -> sets:is_element(Dst, case Kind of + exchange -> XNameSet; + queue -> QNameSet + end); _ -> false end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e05a881207..7268b15de0 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -type(type() :: atom()). -type(fun_name() :: atom()). --spec(recover/0 :: () -> 'ok'). +-spec(recover/0 :: () -> [rabbit_types:resource()]). -spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), @@ -83,14 +83,20 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> - rabbit_misc:table_fold( - fun (X = #exchange{name = XName}, Acc) -> - case mnesia:read({rabbit_exchange, XName}) of - [] -> ok = mnesia:write(rabbit_exchange, X, write), - [XName | Acc]; - [_] -> Acc - end - end, [], rabbit_durable_exchange). + Xs = rabbit_misc:table_fold( + fun (X = #exchange{name = XName}, Acc) -> + case mnesia:read({rabbit_exchange, XName}) of + [] -> ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc]; + [_] -> Acc + end + end, [], rabbit_durable_exchange), + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + [rabbit_exchange:callback(X, create, [Tx, X]) || X <- Xs] + end), + [XName || #exchange{name = XName} <- Xs]. callback(#exchange{type = XType}, Fun, Args) -> apply(type_to_module(XType), Fun, Args). @@ -120,7 +126,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = (type_to_module(Type)):start(Tx, Exchange, []), + ok = (type_to_module(Type)):create(Tx, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index ad08eb8646..0fede0bef2 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -27,7 +27,7 @@ behaviour_info(callbacks) -> {validate, 1}, %% called after declaration and recovery - {start, 3}, + {create, 2}, %% called after exchange (auto)deletion. {delete, 3}, diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 1658c9f849..200c299727 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, start/3, delete/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -40,7 +40,7 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. -start(_Tx, _X, _Bs) -> ok. +create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. add_bindings(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 83afdd717d..6256894984 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, start/3, delete/3, add_bindings/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -39,7 +39,7 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. -start(_Tx, _X, _Bs) -> ok. +create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. add_bindings(_Tx, _X, _Bs) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0fe8404fb7..258e785ade 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, start/3, delete/3, add_bindings/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -113,7 +113,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -start(_Tx, _X, _Bs) -> ok. +create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. add_bindings(_Tx, _X, _Bs) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 52f468ee1b..efa5fb52e2 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, start/3, delete/3, add_bindings/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -48,12 +48,7 @@ route(#exchange{name = X}, validate(_X) -> ok. -start(true, _X, Bs) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) - end); -start(false, _X, _Bs) -> +create(_Tx, _X) -> ok. delete(true, #exchange{name = X}, _Bs) -> |
