summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl47
-rw-r--r--src/rabbit_exchange.erl74
-rw-r--r--src/rabbit_misc.erl15
3 files changed, 70 insertions, 66 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6341cd2e18..016721d919 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -219,11 +219,10 @@ update_bindings(Q = #amqqueue{}, Spec,
add_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
% Since this calls straight through to rabbit_exchange,
% can this exported function be deleted from this module?
- F = fun() ->
- rabbit_exchange:add_binding(#binding{exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName}) end,
- rabbit_misc:execute_mnesia_transaction(F).
+ Binding = #binding{exchange_name = ExchangeName,
+ key = RoutingKey,
+ queue_name = QueueName},
+ rabbit_misc:execute_mnesia_transaction(fun rabbit_exchange:add_binding/1, [Binding]).
% modify_bindings(
% QueueName, ExchangeName, RoutingKey, Arguments,
% fun (Q, _Spec) -> {ok, Q} end,
@@ -234,23 +233,27 @@ add_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
% end).
delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
- modify_bindings(
- QueueName, ExchangeName, RoutingKey, Arguments,
- fun (Q, Spec) -> update_bindings(
- Q, Spec,
- fun lists:delete/2,
- fun rabbit_exchange:delete_binding/2)
- end,
- fun (Q, Spec) ->
- %% the following is essentially a no-op, though crucially
- %% it produces {error, not_found} when the exchange does
- %% not exist.
- case rabbit_exchange:delete_binding(Spec, Q) of
- ok -> {error, binding_not_found};
- Other -> Other
- end
- end).
-
+ Binding = #binding{exchange_name = ExchangeName,
+ key = RoutingKey,
+ queue_name = QueueName},
+ rabbit_misc:execute_mnesia_transaction(fun rabbit_exchange:delete_binding/1, [Binding]).
+ % modify_bindings(
+ % QueueName, ExchangeName, RoutingKey, Arguments,
+ % fun (Q, Spec) -> update_bindings(
+ % Q, Spec,
+ % fun lists:delete/2,
+ % fun rabbit_exchange:delete_binding/2)
+ % end,
+ % fun (Q, Spec) ->
+ % %% the following is essentially a no-op, though crucially
+ % %% it produces {error, not_found} when the exchange does
+ % %% not exist.
+ % case rabbit_exchange:delete_binding(Spec, Q) of
+ % ok -> {error, binding_not_found};
+ % Other -> Other
+ % end
+ % end).
+
lookup(Name) ->
rabbit_misc:dirty_read({amqqueue, Name}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index bdc0bf26a7..dfb7efa276 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -32,7 +32,7 @@
list_vhost_exchanges/1, list_exchange_bindings/1,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/1, delete_binding/2]).
+-export([add_binding/1, delete_binding/1]).
-export([delete/2]).
-export([check_type/1, assert_type/2, topic_matches/2]).
@@ -156,10 +156,10 @@ list_exchange_bindings(Name) ->
% arguments = Arguments},
% queue = QueueName} <- Handlers].
+% Maybe this named wrongly - it returns a route
bindings_for_exchange(Name) ->
- exit(bindings_for_exchange).
- % qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
- % element(1, K) == Name])).
+ qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} <- mnesia:table(route),
+ N == Name])).
empty_handlers() ->
[].
@@ -245,18 +245,19 @@ make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
add_binding(Binding) ->
call_with_exchange_and_queue(
- Binding,
- fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- internal_add_binding(Binding)
- end
- end).
-
-delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName,
- % routing_key = RoutingKey},
- ,Q) ->
- exit(delete_binding).
+ Binding,
+ fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true ->
+ internal_add_binding(Binding)
+ end
+ end).
+
+delete_binding(Binding) ->
+ call_with_exchange_and_queue(
+ Binding,
+ fun (X,Q) -> ok = internal_delete_binding(Binding)
+ end).
% call_with_exchange(
% ExchangeName,
% fun (X) -> ok = internal_delete_binding(
@@ -295,10 +296,14 @@ internal_add_binding(Binding) ->
ok = mnesia:write(Reverse).
%% Must run within a transaction.
-internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) ->
- BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
- remove_handler_from_binding(BindingKey, Handler),
- ok.
+internal_delete_binding(Binding) ->
+ % This is copy and paste from the function above
+ Forwards = #route{ binding = Binding },
+ Reverse = #reverse_route{ reverse_binding = reverse_binding(Binding) },
+ ok = mnesia:delete_object(Forwards),
+ ok = mnesia:delete_object(Reverse).
+
+
%% Must run within a transaction.
add_handler_to_binding(BindingKey, Handler) ->
@@ -383,24 +388,13 @@ internal_delete(ExchangeName, _IfUnused = true) ->
internal_delete(ExchangeName, false) ->
do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)).
-forcibly_remove_handlers(Handlers) ->
- exit(forcibly_remove_handlers).
- % lists:foreach(
- % fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
- % ok = rabbit_amqqueue:binding_forcibly_removed(
- % BindingSpec, QueueName)
- % end, Handlers),
- % ok.
-
+% Don't know if iterating over a list in process memory is cool
+% Maybe we should iterate over the DB cursor?
do_internal_delete(ExchangeName, Bindings) ->
- exit(do_internal_delete).
- % case mnesia:wread({exchange, ExchangeName}) of
- % [] -> {error, not_found};
- % _ ->
- % lists:foreach(fun (#binding{key = K, handlers = H}) ->
- % ok = forcibly_remove_handlers(H),
- % ok = mnesia:delete({binding, K})
- % end, Bindings),
- % ok = mnesia:delete({durable_exchanges, ExchangeName}),
- % ok = mnesia:delete({exchange, ExchangeName})
- % end.
+ case mnesia:wread({exchange, ExchangeName}) of
+ [] -> {error, not_found};
+ _ ->
+ lists:foreach(fun (B) -> ok = mnesia:delete_object(B) end, Bindings),
+ ok = mnesia:delete({durable_exchanges, ExchangeName}),
+ ok = mnesia:delete({exchange, ExchangeName})
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c78a4ec38b..6be301e3d5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -35,7 +35,7 @@
-export([enable_cover/0, report_cover/0]).
-export([with_exit_handler/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
--export([execute_mnesia_transaction/1]).
+-export([execute_mnesia_transaction/1, execute_mnesia_transaction/2]).
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
@@ -223,14 +223,21 @@ with_vhost(VHostPath, Thunk) ->
with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, with_vhost(VHostPath, Thunk)).
+
+%% Making this a sync_transaction allows us to use dirty_read
+%% elsewhere and get a consistent result even when that read
+%% executes on a different node.
execute_mnesia_transaction(TxFun) ->
- %% Making this a sync_transaction allows us to use dirty_read
- %% elsewhere and get a consistent result even when that read
- %% executes on a different node.
case mnesia:sync_transaction(TxFun) of
{atomic, Result} -> Result;
{aborted, Reason} -> throw({error, Reason})
end.
+
+execute_mnesia_transaction(TxFun, Args) ->
+ case mnesia:sync_transaction(TxFun, Args) of
+ {atomic, Result} -> Result;
+ {aborted, Reason} -> throw({error, Reason})
+ end.
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).