diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 64 |
3 files changed, 40 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ac79d56358..f4f6d49c72 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -981,7 +981,7 @@ internal_delete(QueueName, ActingUser) -> ?INTERNAL_USER), fun() -> ok = T(), - rabbit_core_metrics:queue_deleted(QueueName), + rabbit_core_metrics:queue_deleted(QueueName), ok = rabbit_event:notify(queue_deleted, [{name, QueueName}, {user_who_performed_action, ActingUser}]) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a3c8f99519..11f272073b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -311,8 +311,14 @@ terminate_delete(EmitStats, Reason, fun() -> emit_stats(State) end); true -> ok end, - %% don't care if the internal delete doesn't return 'ok'. - rabbit_amqqueue:internal_delete(QName, ActingUser), + %% This try-catch block transforms throws to errors since throws are not + %% logged. + try + %% don't care if the internal delete doesn't return 'ok'. + rabbit_amqqueue:internal_delete(QName, ActingUser) + catch + {error, Reason} -> error(Reason) + end, BQS1 end. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 7498d6b765..06a71ce995 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -166,6 +166,8 @@ add(Binding, InnerFun, ActingUser) -> fun (Src, Dst, B) -> case rabbit_exchange:validate_binding(Src, B) of ok -> + lock_resource(Src), + lock_resource(Dst), %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to %% anything else @@ -184,6 +186,8 @@ add(Binding, InnerFun, ActingUser) -> end, fun not_found_or_absent_errs/1). add(Src, Dst, B, ActingUser) -> + lock_resource(Src), + lock_resource(Dst), [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], case (SrcDurable andalso DstDurable andalso mnesia:read({rabbit_durable_route, B}) =/= []) of @@ -206,6 +210,8 @@ remove(Binding, InnerFun, ActingUser) -> binding_action( Binding, fun (Src, Dst, B) -> + lock_resource(Src), + lock_resource(Dst), case mnesia:read(rabbit_route, B, write) of [] -> case mnesia:read(rabbit_durable_route, B, write) of [] -> rabbit_misc:const(ok); @@ -219,6 +225,8 @@ remove(Binding, InnerFun, ActingUser) -> end, fun absent_errs_only/1). remove(Src, Dst, B, ActingUser) -> + lock_resource(Src), + lock_resource(Dst), ok = sync_route(#route{binding = B}, durable(Src), durable(Dst), fun mnesia:delete_object/3), Deletions = maybe_auto_delete( @@ -303,12 +311,12 @@ has_for_source(SrcName) -> contains(rabbit_semi_durable_route, Match). remove_for_source(SrcName) -> - lock_route_tables(), + lock_resource(SrcName), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, remove_routes( lists:usort( - mnesia:match_object(rabbit_route, Match, write) ++ - mnesia:match_object(rabbit_semi_durable_route, Match, write))). + mnesia:dirty_match_object(rabbit_route, Match) ++ + mnesia:dirty_match_object(rabbit_semi_durable_route, Match))). remove_for_destination(DstName, OnlyDurable) -> remove_for_destination(DstName, OnlyDurable, fun remove_routes/1). @@ -393,32 +401,12 @@ continue('$end_of_table') -> false; continue({[_|_], _}) -> true; continue({[], Continuation}) -> continue(mnesia:select(Continuation)). -%% For bulk operations we lock the tables we are operating on in order -%% to reduce the time complexity. Without the table locks we end up -%% with num_tables*num_bulk_bindings row-level locks. Taking each lock -%% takes time proportional to the number of existing locks, thus -%% resulting in O(num_bulk_bindings^2) complexity. -%% -%% The locks need to be write locks since ultimately we end up -%% removing all these rows. -%% -%% The downside of all this is that no other binding operations except -%% lookup/routing (which uses dirty ops) can take place -%% concurrently. However, that is the case already since the bulk -%% operations involve mnesia:match_object calls with a partial key, -%% which entails taking a table lock. -lock_route_tables() -> - [mnesia:lock({table, T}, write) || T <- [rabbit_route, - rabbit_reverse_route, - rabbit_semi_durable_route, - rabbit_durable_route]]. - remove_routes(Routes) -> %% This partitioning allows us to suppress unnecessary delete %% operations on disk tables, which require an fsync. {RamRoutes, DiskRoutes} = - lists:partition(fun (R) -> mnesia:match_object( - rabbit_durable_route, R, write) == [] end, + lists:partition(fun (R) -> mnesia:dirty_match_object( + rabbit_durable_route, R) == [] end, Routes), %% Of course the destination might not really be durable but it's %% just as easy to try to delete it from the semi-durable table @@ -436,23 +424,33 @@ remove_transient_routes(Routes) -> end || R <- Routes]. remove_for_destination(DstName, OnlyDurable, Fun) -> - lock_route_tables(), + lock_resource(DstName), MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}}, MatchRev = reverse_route(MatchFwd), Routes = case OnlyDurable of - false -> [reverse_route(R) || - R <- mnesia:match_object( - rabbit_reverse_route, MatchRev, write)]; + false -> + [reverse_route(R) || + R <- mnesia:dirty_match_object( + rabbit_reverse_route, MatchRev)]; true -> lists:usort( - mnesia:match_object( - rabbit_durable_route, MatchFwd, write) ++ - mnesia:match_object( - rabbit_semi_durable_route, MatchFwd, write)) + mnesia:dirty_match_object( + rabbit_durable_route, MatchFwd) ++ + mnesia:dirty_match_object( + rabbit_semi_durable_route, MatchFwd)) end, Bindings = Fun(Routes), group_bindings_fold(fun maybe_auto_delete/4, new_deletions(), lists:keysort(#binding.source, Bindings), OnlyDurable). +%% Instead of locking entire table on remove operations we can lock the +%% affected resource only. This will allow us to use dirty_match_object for +%% do faster search of records to delete. +%% This works better when there are multiple resources deleted at once, for +%% example when exclusive queues are deleted. +lock_resource(Name) -> + mnesia:lock({global, Name, mnesia:table_info(rabbit_route, where_to_write)}, + write). + %% Requires that its input binding list is sorted in exchange-name %% order, so that the grouping of bindings (for passing to %% group_bindings_and_auto_delete1) works properly. |
