summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_binding.erl74
3 files changed, 45 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 381f733763..56e8cfc618 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -988,7 +988,7 @@ internal_delete(QueueName, ActingUser) ->
?INTERNAL_USER),
fun() ->
ok = T(),
- rabbit_core_metrics:queue_deleted(QueueName),
+ rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
[{name, QueueName},
{user_who_performed_action, ActingUser}])
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a3c8f99519..11f272073b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -311,8 +311,14 @@ terminate_delete(EmitStats, Reason,
fun() -> emit_stats(State) end);
true -> ok
end,
- %% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, ActingUser),
+ %% This try-catch block transforms throws to errors since throws are not
+ %% logged.
+ try
+ %% don't care if the internal delete doesn't return 'ok'.
+ rabbit_amqqueue:internal_delete(QName, ActingUser)
+ catch
+ {error, Reason} -> error(Reason)
+ end,
BQS1
end.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 7498d6b765..5e6cd48ce9 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -166,6 +166,8 @@ add(Binding, InnerFun, ActingUser) ->
fun (Src, Dst, B) ->
case rabbit_exchange:validate_binding(Src, B) of
ok ->
+ lock_resource(Src),
+ lock_resource(Dst),
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
@@ -184,6 +186,8 @@ add(Binding, InnerFun, ActingUser) ->
end, fun not_found_or_absent_errs/1).
add(Src, Dst, B, ActingUser) ->
+ lock_resource(Src),
+ lock_resource(Dst),
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
case (SrcDurable andalso DstDurable andalso
mnesia:read({rabbit_durable_route, B}) =/= []) of
@@ -206,6 +210,8 @@ remove(Binding, InnerFun, ActingUser) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
+ lock_resource(Src),
+ lock_resource(Dst),
case mnesia:read(rabbit_route, B, write) of
[] -> case mnesia:read(rabbit_durable_route, B, write) of
[] -> rabbit_misc:const(ok);
@@ -219,6 +225,8 @@ remove(Binding, InnerFun, ActingUser) ->
end, fun absent_errs_only/1).
remove(Src, Dst, B, ActingUser) ->
+ lock_resource(Src),
+ lock_resource(Dst),
ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
fun mnesia:delete_object/3),
Deletions = maybe_auto_delete(
@@ -303,12 +311,12 @@ has_for_source(SrcName) ->
contains(rabbit_semi_durable_route, Match).
remove_for_source(SrcName) ->
- lock_route_tables(),
+ lock_resource(SrcName),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
remove_routes(
lists:usort(
- mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_semi_durable_route, Match, write))).
+ mnesia:dirty_match_object(rabbit_route, Match) ++
+ mnesia:dirty_match_object(rabbit_semi_durable_route, Match))).
remove_for_destination(DstName, OnlyDurable) ->
remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
@@ -331,8 +339,8 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end, ErrFun).
-delete_object(Table, Record, LockKind) ->
- mnesia:delete_object(Table, Record, LockKind).
+dirty_delete_object(Table, Record, _LockKind) ->
+ mnesia:dirty_delete_object(Table, Record).
sync_route(Route, true, true, Fun) ->
ok = Fun(rabbit_durable_route, Route, write),
@@ -393,66 +401,56 @@ continue('$end_of_table') -> false;
continue({[_|_], _}) -> true;
continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-%% For bulk operations we lock the tables we are operating on in order
-%% to reduce the time complexity. Without the table locks we end up
-%% with num_tables*num_bulk_bindings row-level locks. Taking each lock
-%% takes time proportional to the number of existing locks, thus
-%% resulting in O(num_bulk_bindings^2) complexity.
-%%
-%% The locks need to be write locks since ultimately we end up
-%% removing all these rows.
-%%
-%% The downside of all this is that no other binding operations except
-%% lookup/routing (which uses dirty ops) can take place
-%% concurrently. However, that is the case already since the bulk
-%% operations involve mnesia:match_object calls with a partial key,
-%% which entails taking a table lock.
-lock_route_tables() ->
- [mnesia:lock({table, T}, write) || T <- [rabbit_route,
- rabbit_reverse_route,
- rabbit_semi_durable_route,
- rabbit_durable_route]].
-
remove_routes(Routes) ->
%% This partitioning allows us to suppress unnecessary delete
%% operations on disk tables, which require an fsync.
{RamRoutes, DiskRoutes} =
- lists:partition(fun (R) -> mnesia:match_object(
- rabbit_durable_route, R, write) == [] end,
+ lists:partition(fun (R) -> mnesia:dirty_match_object(
+ rabbit_durable_route, R) == [] end,
Routes),
%% Of course the destination might not really be durable but it's
%% just as easy to try to delete it from the semi-durable table
%% than check first
- [ok = sync_route(R, false, true, fun mnesia:delete_object/3) ||
+ [ok = sync_route(R, false, true, fun dirty_delete_object/3) ||
R <- RamRoutes],
- [ok = sync_route(R, true, true, fun mnesia:delete_object/3) ||
+ [ok = sync_route(R, true, true, fun dirty_delete_object/3) ||
R <- DiskRoutes],
[R#route.binding || R <- Routes].
remove_transient_routes(Routes) ->
[begin
- ok = sync_transient_route(R, fun delete_object/3),
+ ok = sync_transient_route(R, fun dirty_delete_object/3),
R#route.binding
end || R <- Routes].
remove_for_destination(DstName, OnlyDurable, Fun) ->
- lock_route_tables(),
+ lock_resource(DstName),
MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}},
MatchRev = reverse_route(MatchFwd),
Routes = case OnlyDurable of
- false -> [reverse_route(R) ||
- R <- mnesia:match_object(
- rabbit_reverse_route, MatchRev, write)];
+ false ->
+ [reverse_route(R) ||
+ R <- mnesia:dirty_match_object(
+ rabbit_reverse_route, MatchRev)];
true -> lists:usort(
- mnesia:match_object(
- rabbit_durable_route, MatchFwd, write) ++
- mnesia:match_object(
- rabbit_semi_durable_route, MatchFwd, write))
+ mnesia:dirty_match_object(
+ rabbit_durable_route, MatchFwd) ++
+ mnesia:dirty_match_object(
+ rabbit_semi_durable_route, MatchFwd))
end,
Bindings = Fun(Routes),
group_bindings_fold(fun maybe_auto_delete/4, new_deletions(),
lists:keysort(#binding.source, Bindings), OnlyDurable).
+%% Instead of locking entire table on remove operations we can lock the
+%% affected resource only. This will allow us to use dirty_match_object for
+%% do faster search of records to delete.
+%% This works better when there are multiple resources deleted at once, for
+%% example when exclusive queues are deleted.
+lock_resource(Name) ->
+ mnesia:lock({global, Name, mnesia:table_info(rabbit_route, where_to_write)},
+ write).
+
%% Requires that its input binding list is sorted in exchange-name
%% order, so that the grouping of bindings (for passing to
%% group_bindings_and_auto_delete1) works properly.