summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_exchange.erl158
-rw-r--r--src/rabbit_exchange_behaviour.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl8
-rw-r--r--src/rabbit_exchange_type_fanout.erl8
-rw-r--r--src/rabbit_exchange_type_headers.erl8
-rw-r--r--src/rabbit_exchange_type_topic.erl8
6 files changed, 100 insertions, 96 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index fab53c4bb3..28585022f0 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -95,23 +95,38 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- [begin
- #exchange{ type = Type } = X,
- Type:recover(X)
- end || X <-
- rabbit_misc:table_fold(
- fun(Exchange, Acc) -> ok = mnesia:write(rabbit_exchange,
- Exchange, write),
- [Exchange | Acc]
- end, [], rabbit_durable_exchange)],
- ok = rabbit_misc:table_fold(
- fun(Route, ok) -> {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- ok
- end, ok, rabbit_durable_route).
+ Exs = rabbit_misc:table_fold(
+ fun(Exchange, Acc) -> ok = mnesia:write(rabbit_exchange,
+ Exchange, write),
+ [Exchange | Acc]
+ end, [], rabbit_durable_exchange),
+ Bs = rabbit_misc:table_fold(
+ fun(Route = #route{ binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route),
+ recover_with_bindings(Bs, Exs),
+ ok.
+
+recover_with_bindings(Bs, Exs) ->
+ recover_with_bindings(
+ lists:keysort(#binding.exchange_name, Bs),
+ lists:keysort(#exchange.name, Exs), []).
+
+recover_with_bindings([B = #binding{exchange_name = N} | Rest],
+ Xs = [#exchange{ name = Name} | _],
+ Bindings)
+ when N =:= Name ->
+ recover_with_bindings(Rest, Xs, [B | Bindings]);
+recover_with_bindings(Bs, [X = #exchange{ type = Type } | Xs], Bindings) ->
+ Type:recover(X, Bindings),
+ recover_with_bindings(Bs, Xs, []);
+recover_with_bindings([], [], []) ->
+ ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -136,7 +151,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
end
end) of
{new, X} ->
- ok = Type:init(X),
+ ok = Type:create(X),
X;
{existing, X} ->
X;
@@ -257,13 +272,13 @@ delete_exchange_bindings(ExchangeName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
reverse_route(Route), write),
- ok = delete_forward_routes(Route)
+ ok = delete_forward_routes(Route),
+ Route#route.binding
end || Route <- mnesia:match_object(
rabbit_route,
#route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
- write)],
- ok.
+ write)].
delete_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_forward_routes/1).
@@ -272,53 +287,51 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- Exchanges = exchanges_for_queue(QueueName),
DeletedBindings =
[begin
- ok = FwdDeleteFun(reverse_route(Route)),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
- Route#reverse_route.reverse_binding
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- MaybeDeletedExchanges =
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route, ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ BindingsWithExchanges = cleanup_deleted_queue_bindings(
+ lists:keysort(#binding.exchange_name, DeletedBindings),
+ none, [], []),
fun () ->
- run_deleted_bindings_hooks(DeletedBindings, MaybeDeletedExchanges)
+ lists:foreach(fun ({{deleted, X = #exchange{ type = Type}},
+ Bs}) ->
+ Type:delete(X, Bs);
+ ({{_, X = #exchange{ type = Type }},
+ Bs})->
+ [Type:delete_binding(X, B)
+ || B <- Bs]
+ end, BindingsWithExchanges)
end.
-%% This only works because we pass all the exchanges involved,
-%% whether or not they were deleted, and we can have them in the same
-%% order as the exchanges of the deleted bindings.
-run_deleted_bindings_hooks(Bindings, Exchanges) ->
- SortedBindings = lists:keysort(#binding.exchange_name, Bindings),
- SortedExchanges = lists:keysort(#exchange.name, Exchanges),
- run_deleted_bindings_hooks1(SortedBindings, SortedExchanges).
-
-run_deleted_bindings_hooks1([], Exchanges) ->
- [begin
- Type = X#exchange.type,
- Type:delete(X)
- end || {Maybe, X} <- Exchanges, Maybe == deleted];
-run_deleted_bindings_hooks1(
- [B = #binding{ exchange_name = BName } | Rest],
- Exchanges = [X = {_, #exchange{ name = XName, type = Type }} | _])
- when BName =:= XName ->
- Type:delete_binding(X, B),
- run_deleted_bindings_hooks1(Rest, Exchanges);
-run_deleted_bindings_hooks1(Bindings,
- [X = {Maybe, #exchange{ type = Type }} | Rest]) ->
- case Maybe of
- deleted -> Type:delete(X);
- _ -> ok
- end,
- run_deleted_bindings_hooks1(Bindings, Rest).
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% cleanup_deleted_queue_bindings1) works properly.
+cleanup_deleted_queue_bindings([], ExchangeName, Bindings, Acc) ->
+ cleanup_deleted_queue_bindings1(ExchangeName, Bindings, Acc);
+cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest],
+ ExchangeName, Bindings, Acc)
+ when N =:= ExchangeName ->
+ cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings], Acc);
+cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest],
+ ExchangeName, Bindings, Acc) ->
+ NewAcc = cleanup_deleted_queue_bindings1(ExchangeName, Bindings, Acc),
+ cleanup_deleted_queue_bindings(Rest, N, [B], NewAcc).
+
+cleanup_deleted_queue_bindings1(none, [], Acc) ->
+ Acc;
+cleanup_deleted_queue_bindings1(ExchangeName, Bindings, Acc) ->
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ [{maybe_auto_delete(X), Bindings} | Acc].
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -327,15 +340,6 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
-exchanges_for_queue(QueueName) ->
- MatchHead = reverse_route(
- #route{binding = #binding{exchange_name = '$1',
- queue_name = QueueName,
- _ = '_'}}),
- sets:to_list(
- sets:from_list(
- mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
-
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -409,7 +413,7 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
end) of
{{deleted, X = #exchange{ type = Type }}, B} ->
Type:delete_binding(X, B),
- Type:delete_exchange(X),
+ Type:delete(X),
ok;
{{no_delete, X = #exchange{ type = Type }}, B} ->
Type:delete_binding(X, B),
@@ -488,8 +492,8 @@ delete(ExchangeName, IfUnused) ->
true -> fun unconditional_delete/1
end,
case call_with_exchange(ExchangeName, Fun) of
- {deleted, X = #exchange{ type = Type }} ->
- Type:delete(X),
+ {deleted, X = #exchange{ type = Type }, Bs} ->
+ Type:delete(X, Bs),
ok;
Err -> Err
end.
@@ -514,10 +518,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
end.
unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- ok = delete_exchange_bindings(ExchangeName),
+ Bindings = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- {deleted, Exchange}.
+ {deleted, Exchange, Bindings}.
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
index 58da4041d4..66ebe3838f 100644
--- a/src/rabbit_exchange_behaviour.erl
+++ b/src/rabbit_exchange_behaviour.erl
@@ -39,9 +39,9 @@ behaviour_info(callbacks) ->
{publish, 2},
{validate, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
- {init, 1}, %% called after declaration when previously absent
- {recover, 1}, %% called when recovering
- {delete, 1}, %% called after exchange deletion
+ {create, 1}, %% called after declaration when previously absent
+ {recover, 2}, %% called when recovering
+ {delete, 2}, %% called after exchange deletion
{add_binding, 2}, %% called after a binding has been added
{delete_binding, 2} %% called after a binding has been deleted
];
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index dedd5b694c..ed646fc27b 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
description() ->
@@ -47,8 +47,8 @@ publish(#exchange{name = Name},
rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
validate(_X) -> ok.
-init(_X) -> ok.
-recover(_X) -> ok.
-delete(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index c4cae9e5af..fda40784bd 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
description() ->
@@ -46,8 +46,8 @@ publish(#exchange{name = Name}, Delivery) ->
rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
validate(_X) -> ok.
-init(_X) -> ok.
-recover(_X) -> ok.
-delete(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 40f905ab91..88b88019fd 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,7 +36,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
-ifdef(use_specs).
@@ -121,8 +121,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-init(_X) -> ok.
-recover(_X) -> ok.
-delete(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index b5eac0defe..4bb3f4cb24 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
-export([topic_matches/2]).
@@ -84,8 +84,8 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
validate(_X) -> ok.
-init(_X) -> ok.
-recover(_X) -> ok.
-delete(_X) -> ok.
+create(_X) -> ok.
+recover(_X, _Bs) -> ok.
+delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
delete_binding(_X, _B) -> ok.