summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_exchange_type_spec.hrl3
-rw-r--r--src/rabbit.erl33
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_binding.erl53
-rw-r--r--src/rabbit_exchange.erl26
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl9
10 files changed, 67 insertions, 82 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 8163b6f2c8..fd3ddf7e11 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -20,8 +20,7 @@
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(start/3 :: (boolean(), rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
+-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(add_bindings/3 :: (boolean(), rabbit_types:exchange(),
diff --git a/src/rabbit.erl b/src/rabbit.erl
index fe392c5f4f..2840a5b747 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -124,7 +124,7 @@
{enables, routing_ready}]}).
-rabbit_boot_step({recovery,
- [{description, "exchange / queue recovery"},
+ [{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
@@ -461,35 +461,8 @@ boot_delegate() ->
recover() ->
XNames = rabbit_exchange:recover(),
- QNames = rabbit_amqqueue:start(),
- Bs = rabbit_binding:recover(XNames, QNames),
- {RecXBs, NoRecXBs} = filter_recovered_exchanges(XNames, Bs),
- ok = recovery_callbacks(RecXBs, NoRecXBs).
-
-filter_recovered_exchanges(Xs, Bs) ->
- RecXs = sets:from_list(Xs),
- lists:foldl(
- fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) ->
- case sets:is_element(Src, RecXs) of
- true -> {dict:append(Src, B, RecXBs), NoRecXBs};
- false -> {RecXBs, dict:append(Src, B, NoRecXBs)}
- end
- end, {dict:new(), dict:new()}, Bs).
-
-recovery_callbacks(RecXBs, NoRecXBs) ->
- CB = fun (Tx, F, XBs) ->
- dict:map(fun (XName, Bs) ->
- {ok, X} = rabbit_exchange:lookup(XName),
- rabbit_exchange:callback(X, F, [Tx, X, Bs])
- end, XBs)
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun () -> ok end,
- fun (ok, Tx) ->
- CB(Tx, start, RecXBs),
- CB(Tx, add_bindings, NoRecXBs)
- end),
- ok.
+ QNames = rabbit_amqqueue:recover(),
+ rabbit_binding:recover(XNames, QNames).
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6267b823f2..34ed88bc12 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,7 +16,8 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
+-export([recover/0, stop/0, declare/5, delete_immediately/1, delete/3,
+ purge/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -57,7 +58,7 @@
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
--spec(start/0 :: () -> [rabbit_types:amqqueue()]).
+-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
@@ -157,7 +158,7 @@
%%----------------------------------------------------------------------------
-start() ->
+recover() ->
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -186,8 +187,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q#amqqueue.name || Q <- Qs,
- gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == {new, Q}].
+ [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 4779392084..5ac9c8718b 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -50,8 +50,8 @@
-opaque(deletions() :: dict()).
--spec(recover/2 :: ([rabbit_types:exchange()], [rabbit_types:amqqueue()]) ->
- [rabbit_types:binding()]).
+-spec(recover/2 :: ([rabbit_types:resource()], [rabbit_types:resource()]) ->
+ 'ok').
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
-spec(add/1 :: (rabbit_types:binding()) -> add_res()).
-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
@@ -94,27 +94,38 @@
destination_name, destination_kind,
routing_key, arguments]).
-recover(XsL, QsL) ->
- Xs = sets:from_list(XsL),
- Qs = sets:from_list(QsL),
- rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- case should_recover(B, Xs, Qs) of
- true -> {_, Rev} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route, Route, write),
- ok = mnesia:write(rabbit_reverse_route, Rev, write),
- [B | Acc];
- false -> Acc
- end
- end, [], rabbit_durable_route).
+recover(XNames, QNames) ->
+ XNameSet = sets:from_list(XNames),
+ QNameSet = sets:from_list(QNames),
+ XBs = rabbit_misc:table_fold(
+ fun (Route = #route{binding = B = #binding{source = Src}}, Acc) ->
+ case should_recover(B, XNameSet, QNameSet) of
+ true -> {_, Rev} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, Rev,
+ write),
+ rabbit_misc:dict_cons(Src, B, Acc);
+ false -> Acc
+ end
+ end, dict:new(), rabbit_durable_route),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ dict:map(fun (XName, Bindings) ->
+ {ok, X} = rabbit_exchange:lookup(XName),
+ rabbit_exchange:callback(X, add_bindings,
+ [Tx, X, Bindings])
+ end, XBs)
+ end),
+ ok.
-should_recover(B = #binding{destination = Dest = #resource{ kind = Kind }},
- XNames, QNames) ->
+should_recover(B = #binding{destination = Dst = #resource{ kind = Kind }},
+ XNameSet, QNameSet) ->
case mnesia:read({rabbit_route, B}) of
- [] -> sets:is_element(Dest, case Kind of
- exchange -> XNames;
- queue -> QNames
- end);
+ [] -> sets:is_element(Dst, case Kind of
+ exchange -> XNameSet;
+ queue -> QNameSet
+ end);
_ -> false
end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e05a881207..7268b15de0 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-type(type() :: atom()).
-type(fun_name() :: atom()).
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> [rabbit_types:resource()]).
-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
@@ -83,14 +83,20 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
- rabbit_misc:table_fold(
- fun (X = #exchange{name = XName}, Acc) ->
- case mnesia:read({rabbit_exchange, XName}) of
- [] -> ok = mnesia:write(rabbit_exchange, X, write),
- [XName | Acc];
- [_] -> Acc
- end
- end, [], rabbit_durable_exchange).
+ Xs = rabbit_misc:table_fold(
+ fun (X = #exchange{name = XName}, Acc) ->
+ case mnesia:read({rabbit_exchange, XName}) of
+ [] -> ok = mnesia:write(rabbit_exchange, X, write),
+ [X | Acc];
+ [_] -> Acc
+ end
+ end, [], rabbit_durable_exchange),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ [rabbit_exchange:callback(X, create, [Tx, X]) || X <- Xs]
+ end),
+ [XName || #exchange{name = XName} <- Xs].
callback(#exchange{type = XType}, Fun, Args) ->
apply(type_to_module(XType), Fun, Args).
@@ -120,7 +126,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = (type_to_module(Type)):start(Tx, Exchange, []),
+ ok = (type_to_module(Type)):create(Tx, Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index ad08eb8646..0fede0bef2 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -27,7 +27,7 @@ behaviour_info(callbacks) ->
{validate, 1},
%% called after declaration and recovery
- {start, 3},
+ {create, 2},
%% called after exchange (auto)deletion.
{delete, 3},
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 1658c9f849..200c299727 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, start/3, delete/3,
+-export([validate/1, create/2, delete/3,
add_bindings/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -40,7 +40,7 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
-start(_Tx, _X, _Bs) -> ok.
+create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_bindings(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 83afdd717d..6256894984 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, start/3, delete/3, add_bindings/3,
+-export([validate/1, create/2, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -39,7 +39,7 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
-start(_Tx, _X, _Bs) -> ok.
+create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_bindings(_Tx, _X, _Bs) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 0fe8404fb7..258e785ade 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, start/3, delete/3, add_bindings/3,
+-export([validate/1, create/2, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -113,7 +113,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-start(_Tx, _X, _Bs) -> ok.
+create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_bindings(_Tx, _X, _Bs) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 52f468ee1b..efa5fb52e2 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, start/3, delete/3, add_bindings/3,
+-export([validate/1, create/2, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -48,12 +48,7 @@ route(#exchange{name = X},
validate(_X) -> ok.
-start(true, _X, Bs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end);
-start(false, _X, _Bs) ->
+create(_Tx, _X) ->
ok.
delete(true, #exchange{name = X}, _Bs) ->