diff options
| author | Michael Bridgen <mikeb@lshift.net> | 2010-01-13 17:53:33 +0000 |
|---|---|---|
| committer | Michael Bridgen <mikeb@lshift.net> | 2010-01-13 17:53:33 +0000 |
| commit | 2778cb7181909ab5efbd46d5191ade2b85718c66 (patch) | |
| tree | 11e9ee038a5da22814546b065bd0facf4d172a3a /src | |
| parent | 2470679859e26119837fcc9a7704be77055b3456 (diff) | |
| download | rabbitmq-server-git-2778cb7181909ab5efbd46d5191ade2b85718c66.tar.gz | |
Run the hooks outside and immediately after the mnesia transactions,
if something newsworthy happened. Mostly this is just a case of
changing the thunk run in the transaction so it returns an indication
of what the result was (e.g., a new exchange).
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 189 |
2 files changed, 165 insertions, 84 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d714..e94b55c6c4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -309,33 +309,43 @@ unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> - ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - ok - end - end). + case + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, + %% as decided by rabbit_exchange, after the transaction. + rabbit_exchange:delete_queue_bindings(QueueName) + end + end) of + Err = {error, _} -> Err; + PostHook -> + PostHook(), + ok + end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:fold( - fun (QueueName, Acc) -> - ok = rabbit_exchange:delete_transient_queue_bindings( - QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - Acc - end, - ok, - qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end). + [Hook() || + Hook <- + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:fold( + fun (QueueName, Acc) -> + Post = rabbit_exchange:delete_transient_queue_bindings( + QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + [Post | Acc] + end, + [], + qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)], + ok. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 495fc4b3f3..0e7defab0c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -79,8 +79,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -113,20 +113,26 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, - ok = Type:declare(Exchange), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of - [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), - if Durable -> - ok = mnesia:write(rabbit_durable_exchange, - Exchange, write); - true -> ok - end, - Exchange; - [ExistingX] -> ExistingX - end - end). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), + if Durable -> + ok = mnesia:write(rabbit_durable_exchange, + Exchange, write); + true -> ok + end, + {new, Exchange}; + [ExistingX] -> {existing, ExistingX} + end + end) of + {new, X} -> + ok = Type:declare(X), + X; + {existing, X} -> + X; + Err -> Err + end. typename_to_plugin_module(T) -> case rabbit_exchange_type:lookup_module(T) of @@ -258,20 +264,52 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, FwdDeleteFun) -> Exchanges = exchanges_for_queue(QueueName), + DeletedBindings = + [begin + ok = FwdDeleteFun(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write), + Route#reverse_route.reverse_binding + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + MaybeDeletedExchanges = + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + fun () -> + run_deleted_bindings_hooks(DeletedBindings, MaybeDeletedExchanges) + end. + +%% This only works because we pass all the exchanges involved, +%% whether or not they were deleted, and we can have them in the same +%% order as the exchanges of the deleted bindings. +run_deleted_bindings_hooks(Bindings, Exchanges) -> + SortedBindings = lists:keysort(#binding.exchange_name, Bindings), + SortedExchanges = lists:keysort(#exchange.name, Exchanges), + run_deleted_bindings_hooks1(SortedBindings, SortedExchanges). + +run_deleted_bindings_hooks1([], Exchanges) -> [begin - ok = FwdDeleteFun(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write) - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], - ok. + Type = X#exchange.type, + Type:delete(X) + end || {Maybe, X} <- Exchanges, Maybe == deleted]; +run_deleted_bindings_hooks1( + [B = #binding{ exchange_name = BName } | Rest], + Exchanges = [X = {_, #exchange{ name = XName, type = Type }} | _]) + when BName =:= XName -> + Type:delete_binding(X, B), + run_deleted_bindings_hooks1(Rest, Exchanges); +run_deleted_bindings_hooks1(Bindings, + [X = {Maybe, #exchange{ type = Type }} | Rest]) -> + case Maybe of + deleted -> Type:delete(X); + _ -> ok + end, + run_deleted_bindings_hooks1(Bindings, Rest). delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -325,28 +363,50 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + case mnesia:read(rabbit_route, B) of + [] -> + sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} + end + end + end) of + {new, Exchange = #exchange{ type = Type }, Binding} -> + Type:add_binding(Exchange, Binding); + {existing, _, _} -> + ok; + Err = {error, _} -> Err + end. delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B} + end + end) of + {{deleted, X = #exchange{ type = Type }}, B} -> + Type:delete_binding(X, B), + Type:delete_exchange(X), + ok; + {{no_delete, X = #exchange{ type = Type }}, B} -> + Type:delete_binding(X, B), + ok; + Err -> Err + end. binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( @@ -413,16 +473,26 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. -delete(ExchangeName, _IfUnused = true) -> - call_with_exchange(ExchangeName, fun conditional_delete/1); -delete(ExchangeName, _IfUnused = false) -> - call_with_exchange(ExchangeName, fun unconditional_delete/1). +delete(ExchangeName, IfUnused) -> + Fun = if + IfUnused -> fun conditional_delete/1; + true -> fun unconditional_delete/1 + end, + case call_with_exchange(ExchangeName, Fun) of + {deleted, X = #exchange{ type = Type }} -> + Type:delete(X), + ok; + Err -> Err + end. -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; +maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> + {no_delete, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. + case conditional_delete(Exchange) of + {error, in_use} -> + {no_delete, Exchange}; + Other -> Other + end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, @@ -434,10 +504,11 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> true -> {error, in_use} end. -unconditional_delete(#exchange{name = ExchangeName}) -> +unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}). + ok = mnesia:delete({rabbit_exchange, ExchangeName}), + {deleted, Exchange}. %%---------------------------------------------------------------------------- %% EXTENDED API |
