summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange.erl93
1 files changed, 53 insertions, 40 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 28585022f0..cf719a8d2e 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -134,28 +134,33 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
- %% Don't ignore the return value; we want to upset things if it
- %% isn't ok.
+ %% We want to upset things if it isn't ok; this is different from
+ %% the other hooks invocations, where we tend to ignore the return
+ %% value.
ok = Type:validate(Exchange),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, ExchangeName}) of
- [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
- if Durable ->
- ok = mnesia:write(rabbit_durable_exchange,
- Exchange, write);
- true -> ok
- end,
- {new, Exchange};
- [ExistingX] -> {existing, ExistingX}
+ [] ->
+ ok = mnesia:write(rabbit_exchange, Exchange, write),
+ if Durable ->
+ ok = mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
+ true ->
+ ok
+ end,
+ {new, Exchange};
+ [ExistingX] ->
+ {existing, ExistingX}
end
end) of
{new, X} ->
- ok = Type:create(X),
+ Type:create(X),
X;
{existing, X} ->
X;
- Err -> Err
+ Err ->
+ Err
end.
typename_to_plugin_module(T) ->
@@ -291,25 +296,25 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
[begin
Route = reverse_route(ReverseRoute),
ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route, ReverseRoute, write),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
Route#route.binding
end || ReverseRoute <- mnesia:match_object(
rabbit_reverse_route,
reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
+ #route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
write)],
BindingsWithExchanges = cleanup_deleted_queue_bindings(
- lists:keysort(#binding.exchange_name, DeletedBindings),
+ lists:keysort(#binding.exchange_name,
+ DeletedBindings),
none, [], []),
fun () ->
- lists:foreach(fun ({{deleted, X = #exchange{ type = Type}},
- Bs}) ->
+ lists:foreach(fun ({{deleted, X = #exchange{ type = Type}}, Bs}) ->
Type:delete(X, Bs);
- ({{_, X = #exchange{ type = Type }},
- Bs})->
- [Type:delete_binding(X, B)
- || B <- Bs]
+ ({{_, X = #exchange{ type = Type }}, Bs}) ->
+ [Type:delete_binding(X, B) || B <- Bs]
end, BindingsWithExchanges)
end.
@@ -396,7 +401,8 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
Type:add_binding(Exchange, Binding);
{existing, _, _} ->
ok;
- Err = {error, _} -> Err
+ Err = {error, _} ->
+ Err
end.
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
@@ -405,10 +411,12 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- {maybe_auto_delete(X), B}
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ ok = sync_binding(B, Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B}
end
end) of
{{deleted, X = #exchange{ type = Type }}, B} ->
@@ -418,17 +426,19 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
{{no_delete, X = #exchange{ type = Type }}, B} ->
Type:delete_binding(X, B),
ok;
- Err -> Err
+ Err ->
+ Err
end.
binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
call_with_exchange_and_queue(
ExchangeName, QueueName,
fun (X, Q) ->
- Fun(X, Q, #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
+ Fun(X, Q, #binding{
+ exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = rabbit_misc:sort_field_table(Arguments)})
end).
sync_binding(Binding, Durable, Fun) ->
@@ -453,8 +463,8 @@ list_bindings(VHostPath) ->
rabbit_route,
#route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
+ _ = '_'},
+ _ = '_'})].
route_with_reverse(#route{binding = Binding}) ->
route_with_reverse(Binding);
@@ -473,9 +483,9 @@ reverse_binding(#reverse_binding{exchange_name = Exchange,
key = Key,
args = Args}) ->
#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
+ queue_name = Queue,
+ key = Key,
+ args = Args};
reverse_binding(#binding{exchange_name = Exchange,
queue_name = Queue,
@@ -495,7 +505,8 @@ delete(ExchangeName, IfUnused) ->
{deleted, X = #exchange{ type = Type }, Bs} ->
Type:delete(X, Bs),
ok;
- Err -> Err
+ Err ->
+ Err
end.
maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
@@ -504,7 +515,8 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
case conditional_delete(Exchange) of
{error, in_use} ->
{no_delete, Exchange};
- Other -> Other
+ Other ->
+ Other
end.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
@@ -512,7 +524,8 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
+ case contains(rabbit_route, Match) orelse
+ contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.