diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-01-04 00:37:53 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-01-04 00:37:53 +0300 |
| commit | 67af90f499e7ea8b50d9c2315482b973f839395b (patch) | |
| tree | 23f4dd837ef836adb5848860c8bbfe01c76c24c3 /src | |
| parent | be0d28797810e22aba8c3250bd6090278fe87526 (diff) | |
| parent | 7ba04a6a681e3e3f805b582a7f0aa3f8361c248a (diff) | |
| download | rabbitmq-server-git-67af90f499e7ea8b50d9c2315482b973f839395b.tar.gz | |
Merge pull request #1721 from rabbitmq/remove-default-bindings
Remove default bindings
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 70 |
3 files changed, 93 insertions, 66 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d938bece8c..f0d5e825d1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]). -export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). @@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> not_found -> Q1 = rabbit_policy:set(Q), Q2 = Q1#amqqueue{state = live}, ok = store_queue(Q2), - B = add_default_binding(Q2), - fun () -> B(), {created, Q2} end; + fun () -> {created, Q2} end; {absent, _Q, _} = R -> rabbit_misc:const(R) end; [ExistingQ] -> @@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -add_default_binding(#amqqueue{name = QueueName}) -> - ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), - RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{source = ExchangeName, - destination = QueueName, - key = RoutingKey, - args = []}, - ?INTERNAL_USER). - lookup([]) -> []; %% optimisation lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> @@ -757,6 +747,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). +list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)]. + list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e96dfd7673..258e85ffa2 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -27,6 +27,10 @@ -export([has_for_source/1, remove_for_source/1, remove_for_destination/2, remove_transient_for_destination/1]). +-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath, + kind = exchange, + name = <<>>}). + %%---------------------------------------------------------------------------- -export_type([key/0, deletions/0]). @@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) -> (Serial, false) -> x_callback(Serial, X, add_binding, B) end). +exists(#binding{source = ?DEFAULT_EXCHANGE(_), + destination = #resource{kind = queue, name = QName} = Queue, + key = QName, + args = []}) -> + case rabbit_amqqueue:lookup(Queue) of + {ok, _} -> true; + {error, not_found} -> false + end; exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> @@ -243,9 +255,17 @@ list(VHostPath) -> destination = VHostResource, _ = '_'}, _ = '_'}, - [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, - Route)]. - + %% if there are any default exchange bindings left after an upgrade + %% of a pre-3.8 database, filter them out + AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)], + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_bindings(VHostPath) ++ Filtered. + +list_for_source(?DEFAULT_EXCHANGE(VHostPath)) -> + implicit_bindings(VHostPath); list_for_source(SrcName) -> mnesia:async_dirty( fun() -> @@ -255,16 +275,43 @@ list_for_source(SrcName) -> end). list_for_destination(DstName) -> - mnesia:async_dirty( - fun() -> - Route = #route{binding = #binding{destination = DstName, - _ = '_'}}, - [reverse_binding(B) || - #reverse_route{reverse_binding = B} <- - mnesia:match_object(rabbit_reverse_route, - reverse_route(Route), read)] - end). - + implicit_for_destination(DstName) ++ + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{destination = DstName, + _ = '_'}}, + [reverse_binding(B) || + #reverse_route{reverse_binding = B} <- + mnesia:match_object(rabbit_reverse_route, + reverse_route(Route), read)] + end). + +implicit_bindings(VHostPath) -> + DstQueues = rabbit_amqqueue:list_names(VHostPath), + [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []} + || DstQueue = #resource{name = QName} <- DstQueues ]. + +implicit_for_destination(DstQueue = #resource{kind = queue, + virtual_host = VHostPath, + name = QName}) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; +implicit_for_destination(_) -> + []. + +list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath), + #resource{kind = queue, + virtual_host = VHostPath, + name = QName} = DstQueue) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; list_for_source_and_destination(SrcName, DstName) -> mnesia:async_dirty( fun() -> diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index c532714d41..7c386855f6 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -38,45 +38,33 @@ description() -> <<"Locate queue master node from cluster node with least bound queues">>}]. queue_master_location(#amqqueue{} = Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - VHosts = rabbit_vhost:list(), - BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), - {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters), - {ok, MinMaster}. + Cluster = rabbit_queue_master_location_misc:all_nodes(Q), + QueueNames = rabbit_amqqueue:list_names(), + MastersPerNode = lists:foldl( + fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> + case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of + {ok, Master} when is_atom(Master) -> + case maps:is_key(Master, NodeMasters) of + true -> maps:update_with(Master, + fun(N) -> N + 1 end, + NodeMasters); + false -> NodeMasters + end; + _ -> NodeMasters + end + end, + maps:from_list([{N, 0} || N <- Cluster]), + QueueNames), -%%--------------------------------------------------------------------------- -%% Private helper functions -%%--------------------------------------------------------------------------- -get_min_master(Cluster, BoundQueueMasters) -> - lists:min([ {count_masters(Node, BoundQueueMasters), Node} || - Node <- Cluster ]). - -count_masters(Node, Masters) -> - length([ X || X <- Masters, X == Node ]). - -get_bound_queue_masters_per_vhost([], Acc) -> - lists:flatten(Acc); -get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) -> - BoundQueueNames = - lists:filtermap( - fun(#binding{destination =#resource{kind = queue, - name = QueueName}}) -> - {true, QueueName}; - (_) -> - false - end, - rabbit_binding:list(VHost)), - UniqQueueNames = lists:usort(BoundQueueNames), - BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []), - get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]). - - -get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes; -get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) -> - QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master( - QueueName, VHost) of - {ok, Master} when is_atom(Master) -> - [Master|QueueMastersAcc]; - _ -> QueueMastersAcc - end, - get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0). + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, + MastersPerNode), + {ok, MinNode}.
\ No newline at end of file |
