diff options
| author | Michael Bridgen <mikeb@lshift.net> | 2010-01-15 17:18:03 +0000 |
|---|---|---|
| committer | Michael Bridgen <mikeb@lshift.net> | 2010-01-15 17:18:03 +0000 |
| commit | a34e9313713a20eebaacc564977323c27708cbca (patch) | |
| tree | 48eba1e4cf72bb5faf6c80a636cb8611bdc3e4df /src | |
| parent | 27c7662298ef60b2406090d764256ac55fd2b850 (diff) | |
| download | rabbitmq-server-git-a34e9313713a20eebaacc564977323c27708cbca.tar.gz | |
Change to different exchange callbacks. Requires a bit more
calculation in delete_queue_bindings and recover to collect the
bindings.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_exchange.erl | 158 | ||||
| -rw-r--r-- | src/rabbit_exchange_behaviour.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 8 |
6 files changed, 100 insertions, 96 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index fab53c4bb3..28585022f0 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -95,23 +95,38 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - [begin - #exchange{ type = Type } = X, - Type:recover(X) - end || X <- - rabbit_misc:table_fold( - fun(Exchange, Acc) -> ok = mnesia:write(rabbit_exchange, - Exchange, write), - [Exchange | Acc] - end, [], rabbit_durable_exchange)], - ok = rabbit_misc:table_fold( - fun(Route, ok) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - ok - end, ok, rabbit_durable_route). + Exs = rabbit_misc:table_fold( + fun(Exchange, Acc) -> ok = mnesia:write(rabbit_exchange, + Exchange, write), + [Exchange | Acc] + end, [], rabbit_durable_exchange), + Bs = rabbit_misc:table_fold( + fun(Route = #route{ binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route), + recover_with_bindings(Bs, Exs), + ok. + +recover_with_bindings(Bs, Exs) -> + recover_with_bindings( + lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#exchange.name, Exs), []). + +recover_with_bindings([B = #binding{exchange_name = N} | Rest], + Xs = [#exchange{ name = Name} | _], + Bindings) + when N =:= Name -> + recover_with_bindings(Rest, Xs, [B | Bindings]); +recover_with_bindings(Bs, [X = #exchange{ type = Type } | Xs], Bindings) -> + Type:recover(X, Bindings), + recover_with_bindings(Bs, Xs, []); +recover_with_bindings([], [], []) -> + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -136,7 +151,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> end end) of {new, X} -> - ok = Type:init(X), + ok = Type:create(X), X; {existing, X} -> X; @@ -257,13 +272,13 @@ delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), - ok = delete_forward_routes(Route) + ok = delete_forward_routes(Route), + Route#route.binding end || Route <- mnesia:match_object( rabbit_route, #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - write)], - ok. + write)]. delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -272,53 +287,51 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), DeletedBindings = [begin - ok = FwdDeleteFun(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write), - Route#reverse_route.reverse_binding - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - MaybeDeletedExchanges = - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - maybe_auto_delete(X) - end || ExchangeName <- Exchanges], + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, ReverseRoute, write), + Route#route.binding + end || ReverseRoute <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + BindingsWithExchanges = cleanup_deleted_queue_bindings( + lists:keysort(#binding.exchange_name, DeletedBindings), + none, [], []), fun () -> - run_deleted_bindings_hooks(DeletedBindings, MaybeDeletedExchanges) + lists:foreach(fun ({{deleted, X = #exchange{ type = Type}}, + Bs}) -> + Type:delete(X, Bs); + ({{_, X = #exchange{ type = Type }}, + Bs})-> + [Type:delete_binding(X, B) + || B <- Bs] + end, BindingsWithExchanges) end. -%% This only works because we pass all the exchanges involved, -%% whether or not they were deleted, and we can have them in the same -%% order as the exchanges of the deleted bindings. -run_deleted_bindings_hooks(Bindings, Exchanges) -> - SortedBindings = lists:keysort(#binding.exchange_name, Bindings), - SortedExchanges = lists:keysort(#exchange.name, Exchanges), - run_deleted_bindings_hooks1(SortedBindings, SortedExchanges). - -run_deleted_bindings_hooks1([], Exchanges) -> - [begin - Type = X#exchange.type, - Type:delete(X) - end || {Maybe, X} <- Exchanges, Maybe == deleted]; -run_deleted_bindings_hooks1( - [B = #binding{ exchange_name = BName } | Rest], - Exchanges = [X = {_, #exchange{ name = XName, type = Type }} | _]) - when BName =:= XName -> - Type:delete_binding(X, B), - run_deleted_bindings_hooks1(Rest, Exchanges); -run_deleted_bindings_hooks1(Bindings, - [X = {Maybe, #exchange{ type = Type }} | Rest]) -> - case Maybe of - deleted -> Type:delete(X); - _ -> ok - end, - run_deleted_bindings_hooks1(Bindings, Rest). +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% cleanup_deleted_queue_bindings1) works properly. +cleanup_deleted_queue_bindings([], ExchangeName, Bindings, Acc) -> + cleanup_deleted_queue_bindings1(ExchangeName, Bindings, Acc); +cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], + ExchangeName, Bindings, Acc) + when N =:= ExchangeName -> + cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings], Acc); +cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], + ExchangeName, Bindings, Acc) -> + NewAcc = cleanup_deleted_queue_bindings1(ExchangeName, Bindings, Acc), + cleanup_deleted_queue_bindings(Rest, N, [B], NewAcc). + +cleanup_deleted_queue_bindings1(none, [], Acc) -> + Acc; +cleanup_deleted_queue_bindings1(ExchangeName, Bindings, Acc) -> + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + [{maybe_auto_delete(X), Bindings} | Acc]. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -327,15 +340,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -409,7 +413,7 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> end) of {{deleted, X = #exchange{ type = Type }}, B} -> Type:delete_binding(X, B), - Type:delete_exchange(X), + Type:delete(X), ok; {{no_delete, X = #exchange{ type = Type }}, B} -> Type:delete_binding(X, B), @@ -488,8 +492,8 @@ delete(ExchangeName, IfUnused) -> true -> fun unconditional_delete/1 end, case call_with_exchange(ExchangeName, Fun) of - {deleted, X = #exchange{ type = Type }} -> - Type:delete(X), + {deleted, X = #exchange{ type = Type }, Bs} -> + Type:delete(X, Bs), ok; Err -> Err end. @@ -514,10 +518,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> end. unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - ok = delete_exchange_bindings(ExchangeName), + Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}), - {deleted, Exchange}. + {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl index 58da4041d4..66ebe3838f 100644 --- a/src/rabbit_exchange_behaviour.erl +++ b/src/rabbit_exchange_behaviour.erl @@ -39,9 +39,9 @@ behaviour_info(callbacks) -> {publish, 2}, {validate, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} - {init, 1}, %% called after declaration when previously absent - {recover, 1}, %% called when recovering - {delete, 1}, %% called after exchange deletion + {create, 1}, %% called after declaration when previously absent + {recover, 2}, %% called when recovering + {delete, 2}, %% called after exchange deletion {add_binding, 2}, %% called after a binding has been added {delete_binding, 2} %% called after a binding has been deleted ]; diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index dedd5b694c..ed646fc27b 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -47,8 +47,8 @@ publish(#exchange{name = Name}, rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). validate(_X) -> ok. -init(_X) -> ok. -recover(_X) -> ok. -delete(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index c4cae9e5af..fda40784bd 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -46,8 +46,8 @@ publish(#exchange{name = Name}, Delivery) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). validate(_X) -> ok. -init(_X) -> ok. -recover(_X) -> ok. -delete(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 40f905ab91..88b88019fd 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,7 +36,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -ifdef(use_specs). @@ -121,8 +121,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -init(_X) -> ok. -recover(_X) -> ok. -delete(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. delete_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index b5eac0defe..4bb3f4cb24 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([validate/1, init/1, recover/1, delete/1, add_binding/2, delete_binding/2]). +-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -export([topic_matches/2]). @@ -84,8 +84,8 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). validate(_X) -> ok. -init(_X) -> ok. -recover(_X) -> ok. -delete(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. delete_binding(_X, _B) -> ok. |
