diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 15 |
3 files changed, 70 insertions, 66 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6341cd2e18..016721d919 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -219,11 +219,10 @@ update_bindings(Q = #amqqueue{}, Spec, add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> % Since this calls straight through to rabbit_exchange, % can this exported function be deleted from this module? - F = fun() -> - rabbit_exchange:add_binding(#binding{exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName}) end, - rabbit_misc:execute_mnesia_transaction(F). + Binding = #binding{exchange_name = ExchangeName, + key = RoutingKey, + queue_name = QueueName}, + rabbit_misc:execute_mnesia_transaction(fun rabbit_exchange:add_binding/1, [Binding]). % modify_bindings( % QueueName, ExchangeName, RoutingKey, Arguments, % fun (Q, _Spec) -> {ok, Q} end, @@ -234,23 +233,27 @@ add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> % end). delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun lists:delete/2, - fun rabbit_exchange:delete_binding/2) - end, - fun (Q, Spec) -> - %% the following is essentially a no-op, though crucially - %% it produces {error, not_found} when the exchange does - %% not exist. - case rabbit_exchange:delete_binding(Spec, Q) of - ok -> {error, binding_not_found}; - Other -> Other - end - end). - + Binding = #binding{exchange_name = ExchangeName, + key = RoutingKey, + queue_name = QueueName}, + rabbit_misc:execute_mnesia_transaction(fun rabbit_exchange:delete_binding/1, [Binding]). + % modify_bindings( + % QueueName, ExchangeName, RoutingKey, Arguments, + % fun (Q, Spec) -> update_bindings( + % Q, Spec, + % fun lists:delete/2, + % fun rabbit_exchange:delete_binding/2) + % end, + % fun (Q, Spec) -> + % %% the following is essentially a no-op, though crucially + % %% it produces {error, not_found} when the exchange does + % %% not exist. + % case rabbit_exchange:delete_binding(Spec, Q) of + % ok -> {error, binding_not_found}; + % Other -> Other + % end + % end). + lookup(Name) -> rabbit_misc:dirty_read({amqqueue, Name}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index bdc0bf26a7..dfb7efa276 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -32,7 +32,7 @@ list_vhost_exchanges/1, list_exchange_bindings/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/1, delete_binding/2]). +-export([add_binding/1, delete_binding/1]). -export([delete/2]). -export([check_type/1, assert_type/2, topic_matches/2]). @@ -156,10 +156,10 @@ list_exchange_bindings(Name) -> % arguments = Arguments}, % queue = QueueName} <- Handlers]. +% Maybe this named wrongly - it returns a route bindings_for_exchange(Name) -> - exit(bindings_for_exchange). - % qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), - % element(1, K) == Name])). + qlc:e(qlc:q([R || R = #route{binding = #binding{exchange_name = N}} <- mnesia:table(route), + N == Name])). empty_handlers() -> []. @@ -245,18 +245,19 @@ make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> add_binding(Binding) -> call_with_exchange_and_queue( - Binding, - fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - internal_add_binding(Binding) - end - end). - -delete_binding(BindingSpec %= #binding_spec{exchange_name = ExchangeName, - % routing_key = RoutingKey}, - ,Q) -> - exit(delete_binding). + Binding, + fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + internal_add_binding(Binding) + end + end). + +delete_binding(Binding) -> + call_with_exchange_and_queue( + Binding, + fun (X,Q) -> ok = internal_delete_binding(Binding) + end). % call_with_exchange( % ExchangeName, % fun (X) -> ok = internal_delete_binding( @@ -295,10 +296,14 @@ internal_add_binding(Binding) -> ok = mnesia:write(Reverse). %% Must run within a transaction. -internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - remove_handler_from_binding(BindingKey, Handler), - ok. +internal_delete_binding(Binding) -> + % This is copy and paste from the function above + Forwards = #route{ binding = Binding }, + Reverse = #reverse_route{ reverse_binding = reverse_binding(Binding) }, + ok = mnesia:delete_object(Forwards), + ok = mnesia:delete_object(Reverse). + + %% Must run within a transaction. add_handler_to_binding(BindingKey, Handler) -> @@ -383,24 +388,13 @@ internal_delete(ExchangeName, _IfUnused = true) -> internal_delete(ExchangeName, false) -> do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). -forcibly_remove_handlers(Handlers) -> - exit(forcibly_remove_handlers). - % lists:foreach( - % fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> - % ok = rabbit_amqqueue:binding_forcibly_removed( - % BindingSpec, QueueName) - % end, Handlers), - % ok. - +% Don't know if iterating over a list in process memory is cool +% Maybe we should iterate over the DB cursor? do_internal_delete(ExchangeName, Bindings) -> - exit(do_internal_delete). - % case mnesia:wread({exchange, ExchangeName}) of - % [] -> {error, not_found}; - % _ -> - % lists:foreach(fun (#binding{key = K, handlers = H}) -> - % ok = forcibly_remove_handlers(H), - % ok = mnesia:delete({binding, K}) - % end, Bindings), - % ok = mnesia:delete({durable_exchanges, ExchangeName}), - % ok = mnesia:delete({exchange, ExchangeName}) - % end. + case mnesia:wread({exchange, ExchangeName}) of + [] -> {error, not_found}; + _ -> + lists:foreach(fun (B) -> ok = mnesia:delete_object(B) end, Bindings), + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}) + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c78a4ec38b..6be301e3d5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -35,7 +35,7 @@ -export([enable_cover/0, report_cover/0]). -export([with_exit_handler/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). --export([execute_mnesia_transaction/1]). +-export([execute_mnesia_transaction/1, execute_mnesia_transaction/2]). -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). @@ -223,14 +223,21 @@ with_vhost(VHostPath, Thunk) -> with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). + +%% Making this a sync_transaction allows us to use dirty_read +%% elsewhere and get a consistent result even when that read +%% executes on a different node. execute_mnesia_transaction(TxFun) -> - %% Making this a sync_transaction allows us to use dirty_read - %% elsewhere and get a consistent result even when that read - %% executes on a different node. case mnesia:sync_transaction(TxFun) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. + +execute_mnesia_transaction(TxFun, Args) -> + case mnesia:sync_transaction(TxFun, Args) of + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) + end. ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). |
