summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@lshift.net>2010-01-13 17:53:33 +0000
committerMichael Bridgen <mikeb@lshift.net>2010-01-13 17:53:33 +0000
commit2778cb7181909ab5efbd46d5191ade2b85718c66 (patch)
tree11e9ee038a5da22814546b065bd0facf4d172a3a /src
parent2470679859e26119837fcc9a7704be77055b3456 (diff)
downloadrabbitmq-server-git-2778cb7181909ab5efbd46d5191ade2b85718c66.tar.gz
Run the hooks outside and immediately after the mnesia transactions,
if something newsworthy happened. Mostly this is just a case of changing the thunk run in the transaction so it returns an indication of what the result was (e.g., a new exchange).
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl60
-rw-r--r--src/rabbit_exchange.erl189
2 files changed, 165 insertions, 84 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a5e82d714..e94b55c6c4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -309,33 +309,43 @@ unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] ->
- ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
- ok
- end
- end).
+ case
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [_] ->
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% we want to execute some things,
+ %% as decided by rabbit_exchange, after the transaction.
+ rabbit_exchange:delete_queue_bindings(QueueName)
+ end
+ end) of
+ Err = {error, _} -> Err;
+ PostHook ->
+ PostHook(),
+ ok
+ end.
on_node_down(Node) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:fold(
- fun (QueueName, Acc) ->
- ok = rabbit_exchange:delete_transient_queue_bindings(
- QueueName),
- ok = mnesia:delete({rabbit_queue, QueueName}),
- Acc
- end,
- ok,
- qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end).
+ [Hook() ||
+ Hook <-
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:fold(
+ fun (QueueName, Acc) ->
+ Post = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ [Post | Acc]
+ end,
+ [],
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end)],
+ ok.
pseudo_queue(QueueName, Pid) ->
#amqqueue{name = QueueName,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 495fc4b3f3..0e7defab0c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -79,8 +79,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())).
-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -113,20 +113,26 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
- ok = Type:declare(Exchange),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_exchange, ExchangeName}) of
- [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
- if Durable ->
- ok = mnesia:write(rabbit_durable_exchange,
- Exchange, write);
- true -> ok
- end,
- Exchange;
- [ExistingX] -> ExistingX
- end
- end).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
+ if Durable ->
+ ok = mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
+ true -> ok
+ end,
+ {new, Exchange};
+ [ExistingX] -> {existing, ExistingX}
+ end
+ end) of
+ {new, X} ->
+ ok = Type:declare(X),
+ X;
+ {existing, X} ->
+ X;
+ Err -> Err
+ end.
typename_to_plugin_module(T) ->
case rabbit_exchange_type:lookup_module(T) of
@@ -258,20 +264,52 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
+ DeletedBindings =
+ [begin
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
+ Route#reverse_route.reverse_binding
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ MaybeDeletedExchanges =
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ fun () ->
+ run_deleted_bindings_hooks(DeletedBindings, MaybeDeletedExchanges)
+ end.
+
+%% This only works because we pass all the exchanges involved,
+%% whether or not they were deleted, and we can have them in the same
+%% order as the exchanges of the deleted bindings.
+run_deleted_bindings_hooks(Bindings, Exchanges) ->
+ SortedBindings = lists:keysort(#binding.exchange_name, Bindings),
+ SortedExchanges = lists:keysort(#exchange.name, Exchanges),
+ run_deleted_bindings_hooks1(SortedBindings, SortedExchanges).
+
+run_deleted_bindings_hooks1([], Exchanges) ->
[begin
- ok = FwdDeleteFun(reverse_route(Route)),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- [begin
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- ok = maybe_auto_delete(X)
- end || ExchangeName <- Exchanges],
- ok.
+ Type = X#exchange.type,
+ Type:delete(X)
+ end || {Maybe, X} <- Exchanges, Maybe == deleted];
+run_deleted_bindings_hooks1(
+ [B = #binding{ exchange_name = BName } | Rest],
+ Exchanges = [X = {_, #exchange{ name = XName, type = Type }} | _])
+ when BName =:= XName ->
+ Type:delete_binding(X, B),
+ run_deleted_bindings_hooks1(Rest, Exchanges);
+run_deleted_bindings_hooks1(Bindings,
+ [X = {Maybe, #exchange{ type = Type }} | Rest]) ->
+ case Maybe of
+ deleted -> Type:delete(X);
+ _ -> ok
+ end,
+ run_deleted_bindings_hooks1(Bindings, Rest).
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -325,28 +363,50 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3)
- end
- end).
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ case mnesia:read(rabbit_route, B) of
+ [] ->
+ sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end
+ end
+ end) of
+ {new, Exchange = #exchange{ type = Type }, Binding} ->
+ Type:add_binding(Exchange, Binding);
+ {existing, _, _} ->
+ ok;
+ Err = {error, _} -> Err
+ end.
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
- binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- maybe_auto_delete(X)
- end
- end).
+ case binding_action(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B}
+ end
+ end) of
+ {{deleted, X = #exchange{ type = Type }}, B} ->
+ Type:delete_binding(X, B),
+ Type:delete_exchange(X),
+ ok;
+ {{no_delete, X = #exchange{ type = Type }}, B} ->
+ Type:delete_binding(X, B),
+ ok;
+ Err -> Err
+ end.
binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
@@ -413,16 +473,26 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
-delete(ExchangeName, _IfUnused = true) ->
- call_with_exchange(ExchangeName, fun conditional_delete/1);
-delete(ExchangeName, _IfUnused = false) ->
- call_with_exchange(ExchangeName, fun unconditional_delete/1).
+delete(ExchangeName, IfUnused) ->
+ Fun = if
+ IfUnused -> fun conditional_delete/1;
+ true -> fun unconditional_delete/1
+ end,
+ case call_with_exchange(ExchangeName, Fun) of
+ {deleted, X = #exchange{ type = Type }} ->
+ Type:delete(X),
+ ok;
+ Err -> Err
+ end.
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
+maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
+ {no_delete, Exchange};
maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- conditional_delete(Exchange),
- ok.
+ case conditional_delete(Exchange) of
+ {error, in_use} ->
+ {no_delete, Exchange};
+ Other -> Other
+ end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
@@ -434,10 +504,11 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(#exchange{name = ExchangeName}) ->
+unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}),
+ {deleted, Exchange}.
%%----------------------------------------------------------------------------
%% EXTENDED API