summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange.erl44
1 files changed, 19 insertions, 25 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index bcd5499a6e..143fa48d01 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
recover() ->
- % These two functions share commonalities - maybe a refactoring target
+ % TODO: These two functions share commonalities - maybe a refactoring target
ok = recover_durable_exchanges(),
ok = recover_durable_routes(),
ok.
@@ -204,7 +204,7 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
route(#exchange{name = Name}, RoutingKey) ->
route_internal(Name, RoutingKey).
-% This returns a list of QPids to route to.
+% TODO: This returns a list of QPids to route to.
% Maybe this should be handled by a cursor instead.
% This routes directly to queues, avoiding any lookup of routes
route_internal(#resource{name = Name, virtual_host = VHostPath}, RoutingKey) ->
@@ -217,14 +217,14 @@ route_internal(#resource{name = Name, virtual_host = VHostPath}, RoutingKey) ->
fun() -> mnesia:select(route,[{MatchHead, Guards, ['$3']}])
end)).
-% This returns a list of QPids to route to.
+% TODO: This returns a list of QPids to route to.
% Maybe this should be handled by a cursor instead.
route_internal(Exchange, RoutingKey, MatchFun) ->
Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName,
queue_name = QName,
key = BindingKey}} <- mnesia:table(route),
ExchangeName == Exchange,
- % This causes a full table scan (see bug 19336)
+ % TODO: This causes a full table scan (see bug 19336)
MatchFun(BindingKey, RoutingKey)]),
lookup_qpids(mnesia:activity(async_dirty, fun() -> qlc:e(Query) end)).
@@ -237,15 +237,16 @@ lookup_qpids(Queues) ->
[], Set) end,
mnesia:activity(async_dirty,Fun).
-% Should all of the route and binding management not be refactored to it's own module
+% TODO: Should all of the route and binding management not be refactored to it's own module
% Especially seeing as unbind will have to be implemented for 0.91 ?
delete_routes(#amqqueue{name = Name}) ->
Binding = #binding{queue_name = Name, exchange_name = '_', key = '_'},
{Route, ReverseRoute} = route_with_reverse(Binding),
ok = mnesia:delete_object(Route),
- ok = mnesia:delete_object(ReverseRoute).
+ ok = mnesia:delete_object(ReverseRoute),
+ ok = mnesia:delete_object(durable_routes, Route, write).
-% Don't really like this double lookup
+% TODO: Don't really like this double lookup
% It seems very clunky
% Can this get refactored to to avoid the duplication of the lookup/1 function?
call_with_exchange_and_queue(#binding{exchange_name = Exchange,
@@ -280,7 +281,8 @@ add_binding(Binding) ->
fun (X, Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
- internal_add_binding(Binding, Q#amqqueue.durable)
+ ok = sync_binding(Binding, Q#amqqueue.durable,
+ fun mnesia:write/1, fun mnesia:write/3)
end
end).
@@ -288,8 +290,9 @@ add_binding(Binding) ->
delete_binding(Binding) ->
call_with_exchange_and_queue(
Binding,
- fun (X, Q) -> ok = internal_delete_binding(Binding, Q#amqqueue.durable),
- maybe_auto_delete(X)
+ fun (X, Q) -> ok = sync_binding(Binding, Q#amqqueue.durable,
+ fun mnesia:delete/1, fun mnesia:delete/3),
+ maybe_auto_delete(X)
end).
delete_bindings(Q = #amqqueue{}) ->
@@ -308,23 +311,14 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, queue_name = Queue
#reverse_binding{exchange_name = Exchange, key = Key, queue_name = Queue}.
%% Must run within a transaction.
-internal_add_binding(Binding, Durable) ->
- synchronize_durable_binding(Binding, write, Durable),
- [ok, ok] = [mnesia:write(R) || R <- tuple_to_list(route_with_reverse(Binding))],
+sync_binding(Binding, Durable, RouteSyncFun, DurableRouteSyncFun) ->
+ ok = case Durable of
+ true -> DurableRouteSyncFun(durable_routes, #route{binding = Binding}, write);
+ false -> ok
+ end,
+ [ok, ok] = [RouteSyncFun(R) || R <- tuple_to_list(route_with_reverse(Binding))],
ok.
-%% Must run within a transaction.
-internal_delete_binding(Binding, Durable) ->
- synchronize_durable_binding(Binding, delete, Durable),
- [ok,ok] = [mnesia:delete_object(R) || R <- tuple_to_list(route_with_reverse(Binding))],
- ok.
-
-synchronize_durable_binding(_, _, false) ->
- ok;
-%% Must run within a transaction.
-synchronize_durable_binding(Binding, Function, true) ->
- ok = mnesia:Function(durable_routes, #route{binding = Binding}, write).
-
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
route_with_reverse(Binding = #binding{}) ->