summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_binding.erl73
-rw-r--r--src/rabbit_queue_location_min_masters.erl70
3 files changed, 93 insertions, 66 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d938bece8c..f0d5e825d1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
emit_info_all/5, list_local/1, info_local/1,
emit_info_local/4, emit_info_down/4]).
--export([list_down/1, count/1, list_names/0, list_local_names/0]).
+-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]).
-export([list_by_type/1]).
-export([notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
@@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
not_found -> Q1 = rabbit_policy:set(Q),
Q2 = Q1#amqqueue{state = live},
ok = store_queue(Q2),
- B = add_default_binding(Q2),
- fun () -> B(), {created, Q2} end;
+ fun () -> {created, Q2} end;
{absent, _Q, _} = R -> rabbit_misc:const(R)
end;
[ExistingQ] ->
@@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1},
%% mirroring-related has changed - the policy may have changed anyway.
notify_policy_changed(Q1).
-add_default_binding(#amqqueue{name = QueueName}) ->
- ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
- RoutingKey = QueueName#resource.name,
- rabbit_binding:add(#binding{source = ExchangeName,
- destination = QueueName,
- key = RoutingKey,
- args = []},
- ?INTERNAL_USER).
-
lookup([]) -> []; %% optimisation
lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
@@ -757,6 +747,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
list_names() -> mnesia:dirty_all_keys(rabbit_queue).
+list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)].
+
list_local_names() ->
[ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(),
State =/= crashed, is_local_to_node(QPid, node())].
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e96dfd7673..258e85ffa2 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -27,6 +27,10 @@
-export([has_for_source/1, remove_for_source/1,
remove_for_destination/2, remove_transient_for_destination/1]).
+-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
+ kind = exchange,
+ name = <<>>}).
+
%%----------------------------------------------------------------------------
-export_type([key/0, deletions/0]).
@@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
(Serial, false) -> x_callback(Serial, X, add_binding, B)
end).
+exists(#binding{source = ?DEFAULT_EXCHANGE(_),
+ destination = #resource{kind = queue, name = QName} = Queue,
+ key = QName,
+ args = []}) ->
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, _} -> true;
+ {error, not_found} -> false
+ end;
exists(Binding) ->
binding_action(
Binding, fun (_Src, _Dst, B) ->
@@ -243,9 +255,17 @@ list(VHostPath) ->
destination = VHostResource,
_ = '_'},
_ = '_'},
- [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
- Route)].
-
+ %% if there are any default exchange bindings left after an upgrade
+ %% of a pre-3.8 database, filter them out
+ AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)],
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_bindings(VHostPath) ++ Filtered.
+
+list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
+ implicit_bindings(VHostPath);
list_for_source(SrcName) ->
mnesia:async_dirty(
fun() ->
@@ -255,16 +275,43 @@ list_for_source(SrcName) ->
end).
list_for_destination(DstName) ->
- mnesia:async_dirty(
- fun() ->
- Route = #route{binding = #binding{destination = DstName,
- _ = '_'}},
- [reverse_binding(B) ||
- #reverse_route{reverse_binding = B} <-
- mnesia:match_object(rabbit_reverse_route,
- reverse_route(Route), read)]
- end).
-
+ implicit_for_destination(DstName) ++
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{destination = DstName,
+ _ = '_'}},
+ [reverse_binding(B) ||
+ #reverse_route{reverse_binding = B} <-
+ mnesia:match_object(rabbit_reverse_route,
+ reverse_route(Route), read)]
+ end).
+
+implicit_bindings(VHostPath) ->
+ DstQueues = rabbit_amqqueue:list_names(VHostPath),
+ [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}
+ || DstQueue = #resource{name = QName} <- DstQueues ].
+
+implicit_for_destination(DstQueue = #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName}) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
+implicit_for_destination(_) ->
+ [].
+
+list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
+ #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName} = DstQueue) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
list_for_source_and_destination(SrcName, DstName) ->
mnesia:async_dirty(
fun() ->
diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl
index c532714d41..7c386855f6 100644
--- a/src/rabbit_queue_location_min_masters.erl
+++ b/src/rabbit_queue_location_min_masters.erl
@@ -38,45 +38,33 @@ description() ->
<<"Locate queue master node from cluster node with least bound queues">>}].
queue_master_location(#amqqueue{} = Q) ->
- Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
- VHosts = rabbit_vhost:list(),
- BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []),
- {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters),
- {ok, MinMaster}.
+ Cluster = rabbit_queue_master_location_misc:all_nodes(Q),
+ QueueNames = rabbit_amqqueue:list_names(),
+ MastersPerNode = lists:foldl(
+ fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) ->
+ case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of
+ {ok, Master} when is_atom(Master) ->
+ case maps:is_key(Master, NodeMasters) of
+ true -> maps:update_with(Master,
+ fun(N) -> N + 1 end,
+ NodeMasters);
+ false -> NodeMasters
+ end;
+ _ -> NodeMasters
+ end
+ end,
+ maps:from_list([{N, 0} || N <- Cluster]),
+ QueueNames),
-%%---------------------------------------------------------------------------
-%% Private helper functions
-%%---------------------------------------------------------------------------
-get_min_master(Cluster, BoundQueueMasters) ->
- lists:min([ {count_masters(Node, BoundQueueMasters), Node} ||
- Node <- Cluster ]).
-
-count_masters(Node, Masters) ->
- length([ X || X <- Masters, X == Node ]).
-
-get_bound_queue_masters_per_vhost([], Acc) ->
- lists:flatten(Acc);
-get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) ->
- BoundQueueNames =
- lists:filtermap(
- fun(#binding{destination =#resource{kind = queue,
- name = QueueName}}) ->
- {true, QueueName};
- (_) ->
- false
- end,
- rabbit_binding:list(VHost)),
- UniqQueueNames = lists:usort(BoundQueueNames),
- BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []),
- get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]).
-
-
-get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes;
-get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) ->
- QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master(
- QueueName, VHost) of
- {ok, Master} when is_atom(Master) ->
- [Master|QueueMastersAcc];
- _ -> QueueMastersAcc
- end,
- get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0).
+ {MinNode, _NMasters} = maps:fold(
+ fun(Node, NMasters, init) ->
+ {Node, NMasters};
+ (Node, NMasters, {MinNode, MinMasters}) ->
+ case NMasters < MinMasters of
+ true -> {Node, NMasters};
+ false -> {MinNode, MinMasters}
+ end
+ end,
+ init,
+ MastersPerNode),
+ {ok, MinNode}. \ No newline at end of file