summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_binding.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_binding.erl')
-rw-r--r--deps/rabbit/src/rabbit_binding.erl691
1 files changed, 691 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_binding.erl b/deps/rabbit/src/rabbit_binding.erl
new file mode 100644
index 0000000000..6ef25c4e60
--- /dev/null
+++ b/deps/rabbit/src/rabbit_binding.erl
@@ -0,0 +1,691 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_binding).
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("amqqueue.hrl").
+
+-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/2, remove/3, remove/4]).
+-export([list/1, list_for_source/1, list_for_destination/1,
+ list_for_source_and_destination/2, list_explicit/0]).
+-export([new_deletions/0, combine_deletions/2, add_deletion/3,
+ process_deletions/2, binding_action/3]).
+-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
+%% these must all be run inside a mnesia tx
+-export([has_for_source/1, remove_for_source/1,
+ remove_for_destination/2, remove_transient_for_destination/1,
+ remove_default_exchange_binding_rows_of/1]).
+
+-export([implicit_for_destination/1, reverse_binding/1]).
+-export([new/4]).
+
+-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
+ kind = exchange,
+ name = <<>>}).
+
+%%----------------------------------------------------------------------------
+
+-export_type([key/0, deletions/0]).
+
+-type key() :: binary().
+
+-type bind_errors() :: rabbit_types:error(
+ {'resources_missing',
+ [{'not_found', (rabbit_types:binding_source() |
+ rabbit_types:binding_destination())} |
+ {'absent', amqqueue:amqqueue()}]}).
+
+-type bind_ok_or_error() :: 'ok' | bind_errors() |
+ rabbit_types:error(
+ {'binding_invalid', string(), [any()]}).
+-type bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error()).
+-type inner_fun() ::
+ fun((rabbit_types:exchange(),
+ rabbit_types:exchange() | amqqueue:amqqueue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error())).
+-type bindings() :: [rabbit_types:binding()].
+
+%% TODO this should really be opaque but that seems to confuse 17.1's
+%% dialyzer into objecting to everything that uses it.
+-type deletions() :: dict:dict().
+
+%%----------------------------------------------------------------------------
+
+-spec new(rabbit_types:exchange(),
+ key(),
+ rabbit_types:exchange() | amqqueue:amqqueue(),
+ rabbit_framing:amqp_table()) ->
+ rabbit_types:binding().
+
+new(Src, RoutingKey, Dst, #{}) ->
+ new(Src, RoutingKey, Dst, []);
+new(Src, RoutingKey, Dst, Arguments) when is_map(Arguments) ->
+ new(Src, RoutingKey, Dst, maps:to_list(Arguments));
+new(Src, RoutingKey, Dst, Arguments) ->
+ #binding{source = Src, key = RoutingKey, destination = Dst, args = Arguments}.
+
+
+-define(INFO_KEYS, [source_name, source_kind,
+ destination_name, destination_kind,
+ routing_key, arguments,
+ vhost]).
+
+%% Global table recovery
+
+-spec recover([rabbit_exchange:name()], [rabbit_amqqueue:name()]) ->
+ 'ok'.
+
+recover() ->
+ rabbit_misc:table_filter(
+ fun (Route) ->
+ mnesia:read({rabbit_semi_durable_route, Route}) =:= []
+ end,
+ fun (Route, true) ->
+ ok = mnesia:write(rabbit_semi_durable_route, Route, write);
+ (_Route, false) ->
+ ok
+ end, rabbit_durable_route).
+
+%% Virtual host-specific recovery
+recover(XNames, QNames) ->
+ XNameSet = sets:from_list(XNames),
+ QNameSet = sets:from_list(QNames),
+ SelectSet = fun (#resource{kind = exchange}) -> XNameSet;
+ (#resource{kind = queue}) -> QNameSet
+ end,
+ {ok, Gatherer} = gatherer:start_link(),
+ [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) ||
+ R = #route{binding = #binding{destination = Dst}} <-
+ rabbit_misc:dirty_read_all(rabbit_semi_durable_route)],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
+ ok.
+
+recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) ->
+ #binding{source = Src, destination = Dst} = B,
+ case sets:is_element(Dst, ToRecover) of
+ true -> {ok, X} = rabbit_exchange:lookup(Src),
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ recover_semi_durable_route_txn(R, X),
+ gatherer:finish(Gatherer)
+ end);
+ false -> ok
+ end.
+
+recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read(rabbit_semi_durable_route, B, read) of
+ [] -> no_recover;
+ _ -> ok = sync_transient_route(R, fun mnesia:write/3),
+ rabbit_exchange:serial(X)
+ end
+ end,
+ fun (no_recover, _) -> ok;
+ (_Serial, true) -> x_callback(transaction, X, add_binding, B);
+ (Serial, false) -> x_callback(Serial, X, add_binding, B)
+ end).
+
+-spec exists(rabbit_types:binding()) -> boolean() | bind_errors().
+
+exists(#binding{source = ?DEFAULT_EXCHANGE(_),
+ destination = #resource{kind = queue, name = QName} = Queue,
+ key = QName,
+ args = []}) ->
+ case rabbit_amqqueue:lookup(Queue) of
+ {ok, _} -> true;
+ {error, not_found} -> false
+ end;
+exists(Binding) ->
+ binding_action(
+ Binding, fun (_Src, _Dst, B) ->
+ rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
+ end, fun not_found_or_absent_errs/1).
+
+-spec add(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
+
+add(Binding, ActingUser) -> add(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
+
+-spec add(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
+
+add(Binding, InnerFun, ActingUser) ->
+ binding_action(
+ Binding,
+ fun (Src, Dst, B) ->
+ case rabbit_exchange:validate_binding(Src, B) of
+ ok ->
+ lock_resource(Src, read),
+ lock_resource(Dst, read),
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(Src, Dst) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B, ActingUser);
+ [_] -> fun () -> ok end
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end
+ end, fun not_found_or_absent_errs/1).
+
+add(Src, Dst, B, ActingUser) ->
+ [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
+ ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ fun mnesia:write/3),
+ x_callback(transaction, Src, add_binding, B),
+ Serial = rabbit_exchange:serial(Src),
+ fun () ->
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(
+ binding_created,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
+ end.
+
+-spec remove(rabbit_types:binding()) -> bind_res().
+remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end, ?INTERNAL_USER).
+
+-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
+remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
+
+
+-spec remove(rabbit_types:binding(), inner_fun(), rabbit_types:username()) -> bind_res().
+remove(Binding, InnerFun, ActingUser) ->
+ binding_action(
+ Binding,
+ fun (Src, Dst, B) ->
+ lock_resource(Src, read),
+ lock_resource(Dst, read),
+ case mnesia:read(rabbit_route, B, write) of
+ [] -> case mnesia:read(rabbit_durable_route, B, write) of
+ [] -> rabbit_misc:const(ok);
+ %% We still delete the binding and run
+ %% all post-delete functions if there is only
+ %% a durable route in the database
+ _ -> remove(Src, Dst, B, ActingUser)
+ end;
+ _ -> case InnerFun(Src, Dst) of
+ ok -> remove(Src, Dst, B, ActingUser);
+ {error, _} = Err -> rabbit_misc:const(Err)
+ end
+ end
+ end, fun absent_errs_only/1).
+
+remove(Src, Dst, B, ActingUser) ->
+ ok = sync_route(#route{binding = B}, durable(Src), durable(Dst),
+ fun delete/3),
+ Deletions = maybe_auto_delete(
+ B#binding.source, [B], new_deletions(), false),
+ process_deletions(Deletions, ActingUser).
+
+%% Implicit bindings are implicit as of rabbitmq/rabbitmq-server#1721.
+remove_default_exchange_binding_rows_of(Dst = #resource{}) ->
+ case implicit_for_destination(Dst) of
+ [Binding] ->
+ mnesia:dirty_delete(rabbit_durable_route, Binding),
+ mnesia:dirty_delete(rabbit_semi_durable_route, Binding),
+ mnesia:dirty_delete(rabbit_reverse_route,
+ reverse_binding(Binding)),
+ mnesia:dirty_delete(rabbit_route, Binding);
+ _ ->
+ %% no binding to remove or
+ %% a competing tx has beaten us to it?
+ ok
+ end,
+ ok.
+
+-spec list_explicit() -> bindings().
+
+list_explicit() ->
+ mnesia:async_dirty(
+ fun () ->
+ AllRoutes = mnesia:dirty_match_object(rabbit_route, #route{_ = '_'}),
+ %% if there are any default exchange bindings left after an upgrade
+ %% of a pre-3.8 database, filter them out
+ AllBindings = [B || #route{binding = B} <- AllRoutes],
+ lists:filter(fun(#binding{source = S}) ->
+ not (S#resource.kind =:= exchange andalso S#resource.name =:= <<>>)
+ end, AllBindings)
+ end).
+
+-spec list(rabbit_types:vhost()) -> bindings().
+
+list(VHostPath) ->
+ VHostResource = rabbit_misc:r(VHostPath, '_'),
+ Route = #route{binding = #binding{source = VHostResource,
+ destination = VHostResource,
+ _ = '_'},
+ _ = '_'},
+ %% if there are any default exchange bindings left after an upgrade
+ %% of a pre-3.8 database, filter them out
+ AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)],
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_bindings(VHostPath) ++ Filtered.
+
+-spec list_for_source
+ (rabbit_types:binding_source()) -> bindings().
+
+list_for_source(?DEFAULT_EXCHANGE(VHostPath)) ->
+ implicit_bindings(VHostPath);
+list_for_source(SrcName) ->
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{source = SrcName, _ = '_'}},
+ [B || #route{binding = B}
+ <- mnesia:match_object(rabbit_route, Route, read)]
+ end).
+
+-spec list_for_destination
+ (rabbit_types:binding_destination()) -> bindings().
+
+list_for_destination(DstName = #resource{virtual_host = VHostPath}) ->
+ AllBindings = mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{destination = DstName,
+ _ = '_'}},
+ [reverse_binding(B) ||
+ #reverse_route{reverse_binding = B} <-
+ mnesia:match_object(rabbit_reverse_route,
+ reverse_route(Route), read)]
+ end),
+ Filtered = lists:filter(fun(#binding{source = S}) ->
+ S =/= ?DEFAULT_EXCHANGE(VHostPath)
+ end, AllBindings),
+ implicit_for_destination(DstName) ++ Filtered.
+
+implicit_bindings(VHostPath) ->
+ DstQueues = rabbit_amqqueue:list_names(VHostPath),
+ [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}
+ || DstQueue = #resource{name = QName} <- DstQueues ].
+
+implicit_for_destination(DstQueue = #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName}) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
+implicit_for_destination(_) ->
+ [].
+
+-spec list_for_source_and_destination
+ (rabbit_types:binding_source(), rabbit_types:binding_destination()) ->
+ bindings().
+
+list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath),
+ #resource{kind = queue,
+ virtual_host = VHostPath,
+ name = QName} = DstQueue) ->
+ [#binding{source = ?DEFAULT_EXCHANGE(VHostPath),
+ destination = DstQueue,
+ key = QName,
+ args = []}];
+list_for_source_and_destination(SrcName, DstName) ->
+ mnesia:async_dirty(
+ fun() ->
+ Route = #route{binding = #binding{source = SrcName,
+ destination = DstName,
+ _ = '_'}},
+ [B || #route{binding = B} <- mnesia:match_object(rabbit_route,
+ Route, read)]
+ end).
+
+-spec info_keys() -> rabbit_types:info_keys().
+
+info_keys() -> ?INFO_KEYS.
+
+map(VHostPath, F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list(VHostPath)).
+
+infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
+
+i(source_name, #binding{source = SrcName}) -> SrcName#resource.name;
+i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind;
+i(vhost, #binding{source = SrcName}) -> SrcName#resource.virtual_host;
+i(destination_name, #binding{destination = DstName}) -> DstName#resource.name;
+i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+-spec info(rabbit_types:binding()) -> rabbit_types:infos().
+
+info(B = #binding{}) -> infos(?INFO_KEYS, B).
+
+-spec info(rabbit_types:binding(), rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+
+info(B = #binding{}, Items) -> infos(Items, B).
+
+-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
+
+info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
+
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
+ [rabbit_types:infos()].
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
+
+-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys(),
+ reference(), pid()) -> 'ok'.
+
+info_all(VHostPath, Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map(
+ AggregatorPid, Ref, fun(B) -> info(B, Items) end, list(VHostPath)).
+
+-spec has_for_source(rabbit_types:binding_source()) -> boolean().
+
+has_for_source(SrcName) ->
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
+ %% we need to check for semi-durable routes (which subsumes
+ %% durable routes) here too in case a bunch of routes to durable
+ %% queues have been removed temporarily as a result of a node
+ %% failure
+ contains(rabbit_route, Match) orelse
+ contains(rabbit_semi_durable_route, Match).
+
+-spec remove_for_source(rabbit_types:binding_source()) -> bindings().
+
+remove_for_source(SrcName) ->
+ lock_resource(SrcName),
+ Match = #route{binding = #binding{source = SrcName, _ = '_'}},
+ remove_routes(
+ lists:usort(
+ mnesia:dirty_match_object(rabbit_route, Match) ++
+ mnesia:dirty_match_object(rabbit_semi_durable_route, Match))).
+
+-spec remove_for_destination
+ (rabbit_types:binding_destination(), boolean()) -> deletions().
+
+remove_for_destination(DstName, OnlyDurable) ->
+ remove_for_destination(DstName, OnlyDurable, fun remove_routes/1).
+
+-spec remove_transient_for_destination
+ (rabbit_types:binding_destination()) -> deletions().
+
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, false, fun remove_transient_routes/1).
+
+%%----------------------------------------------------------------------------
+
+durable(#exchange{durable = D}) -> D;
+durable(Q) when ?is_amqqueue(Q) ->
+ amqqueue:is_durable(Q).
+
+binding_action(Binding = #binding{source = SrcName,
+ destination = DstName,
+ args = Arguments}, Fun, ErrFun) ->
+ call_with_source_and_destination(
+ SrcName, DstName,
+ fun (Src, Dst) ->
+ SortedArgs = rabbit_misc:sort_field_table(Arguments),
+ Fun(Src, Dst, Binding#binding{args = SortedArgs})
+ end, ErrFun).
+
+sync_route(Route, true, true, Fun) ->
+ ok = Fun(rabbit_durable_route, Route, write),
+ sync_route(Route, false, true, Fun);
+
+sync_route(Route, false, true, Fun) ->
+ ok = Fun(rabbit_semi_durable_route, Route, write),
+ sync_route(Route, false, false, Fun);
+
+sync_route(Route, _SrcDurable, false, Fun) ->
+ sync_transient_route(Route, Fun).
+
+sync_transient_route(Route, Fun) ->
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, reverse_route(Route), write).
+
+call_with_source_and_destination(SrcName, DstName, Fun, ErrFun) ->
+ SrcTable = table_for_resource(SrcName),
+ DstTable = table_for_resource(DstName),
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> ErrFun([SrcName]);
+ {[_], [] } -> ErrFun([DstName]);
+ {[], [] } -> ErrFun([SrcName, DstName])
+ end
+ end).
+
+not_found_or_absent_errs(Names) ->
+ Errs = [not_found_or_absent(Name) || Name <- Names],
+ rabbit_misc:const({error, {resources_missing, Errs}}).
+
+absent_errs_only(Names) ->
+ Errs = [E || Name <- Names,
+ {absent, _Q, _Reason} = E <- [not_found_or_absent(Name)]],
+ rabbit_misc:const(case Errs of
+ [] -> ok;
+ _ -> {error, {resources_missing, Errs}}
+ end).
+
+table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
+table_for_resource(#resource{kind = queue}) -> rabbit_queue.
+
+not_found_or_absent(#resource{kind = exchange} = Name) ->
+ {not_found, Name};
+not_found_or_absent(#resource{kind = queue} = Name) ->
+ case rabbit_amqqueue:not_found_or_absent(Name) of
+ not_found -> {not_found, Name};
+ {absent, _Q, _Reason} = R -> R
+ end.
+
+contains(Table, MatchHead) ->
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+remove_routes(Routes) ->
+ %% This partitioning allows us to suppress unnecessary delete
+ %% operations on disk tables, which require an fsync.
+ {RamRoutes, DiskRoutes} =
+ lists:partition(fun (R) -> mnesia:read(
+ rabbit_durable_route, R#route.binding, read) == [] end,
+ Routes),
+ {RamOnlyRoutes, SemiDurableRoutes} =
+ lists:partition(fun (R) -> mnesia:read(
+ rabbit_semi_durable_route, R#route.binding, read) == [] end,
+ RamRoutes),
+ %% Of course the destination might not really be durable but it's
+ %% just as easy to try to delete it from the semi-durable table
+ %% than check first
+ [ok = sync_route(R, true, true, fun delete/3) ||
+ R <- DiskRoutes],
+ [ok = sync_route(R, false, true, fun delete/3) ||
+ R <- SemiDurableRoutes],
+ [ok = sync_route(R, false, false, fun delete/3) ||
+ R <- RamOnlyRoutes],
+ [R#route.binding || R <- Routes].
+
+
+delete(Tab, #route{binding = B}, LockKind) ->
+ mnesia:delete(Tab, B, LockKind);
+delete(Tab, #reverse_route{reverse_binding = B}, LockKind) ->
+ mnesia:delete(Tab, B, LockKind).
+
+remove_transient_routes(Routes) ->
+ [begin
+ ok = sync_transient_route(R, fun delete/3),
+ R#route.binding
+ end || R <- Routes].
+
+remove_for_destination(DstName, OnlyDurable, Fun) ->
+ lock_resource(DstName),
+ MatchFwd = #route{binding = #binding{destination = DstName, _ = '_'}},
+ MatchRev = reverse_route(MatchFwd),
+ Routes = case OnlyDurable of
+ false ->
+ [reverse_route(R) ||
+ R <- mnesia:dirty_match_object(
+ rabbit_reverse_route, MatchRev)];
+ true -> lists:usort(
+ mnesia:dirty_match_object(
+ rabbit_durable_route, MatchFwd) ++
+ mnesia:dirty_match_object(
+ rabbit_semi_durable_route, MatchFwd))
+ end,
+ Bindings = Fun(Routes),
+ group_bindings_fold(fun maybe_auto_delete/4, new_deletions(),
+ lists:keysort(#binding.source, Bindings), OnlyDurable).
+
+%% Instead of locking entire table on remove operations we can lock the
+%% affected resource only.
+lock_resource(Name) -> lock_resource(Name, write).
+
+lock_resource(Name, LockKind) ->
+ mnesia:lock({global, Name, mnesia:table_info(rabbit_route, where_to_write)},
+ LockKind).
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% group_bindings_and_auto_delete1) works properly.
+group_bindings_fold(_Fun, Acc, [], _OnlyDurable) ->
+ Acc;
+group_bindings_fold(Fun, Acc, [B = #binding{source = SrcName} | Bs],
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B], OnlyDurable).
+
+group_bindings_fold(
+ Fun, SrcName, Acc, [B = #binding{source = SrcName} | Bs], Bindings,
+ OnlyDurable) ->
+ group_bindings_fold(Fun, SrcName, Acc, Bs, [B | Bindings], OnlyDurable);
+group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings, OnlyDurable) ->
+ %% Either Removed is [], or its head has a non-matching SrcName.
+ group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc, OnlyDurable), Removed,
+ OnlyDurable).
+
+maybe_auto_delete(XName, Bindings, Deletions, OnlyDurable) ->
+ {Entry, Deletions1} =
+ case mnesia:read({case OnlyDurable of
+ true -> rabbit_durable_exchange;
+ false -> rabbit_exchange
+ end, XName}) of
+ [] -> {{undefined, not_deleted, Bindings}, Deletions};
+ [X] -> case rabbit_exchange:maybe_auto_delete(X, OnlyDurable) of
+ not_deleted ->
+ {{X, not_deleted, Bindings}, Deletions};
+ {deleted, Deletions2} ->
+ {{X, deleted, Bindings},
+ combine_deletions(Deletions, Deletions2)}
+ end
+ end,
+ add_deletion(XName, Entry, Deletions1).
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{source = SrcName,
+ destination = DstName,
+ key = Key,
+ args = Args}.
+
+%% ----------------------------------------------------------------------------
+%% Binding / exchange deletion abstraction API
+%% ----------------------------------------------------------------------------
+
+anything_but( NotThis, NotThis, NotThis) -> NotThis;
+anything_but( NotThis, NotThis, This) -> This;
+anything_but( NotThis, This, NotThis) -> This;
+anything_but(_NotThis, This, This) -> This.
+
+-spec new_deletions() -> deletions().
+
+new_deletions() -> dict:new().
+
+-spec add_deletion
+ (rabbit_exchange:name(),
+ {'undefined' | rabbit_types:exchange(),
+ 'deleted' | 'not_deleted',
+ bindings()},
+ deletions()) ->
+ deletions().
+
+add_deletion(XName, Entry, Deletions) ->
+ dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
+ Entry, Deletions).
+
+-spec combine_deletions(deletions(), deletions()) -> deletions().
+
+combine_deletions(Deletions1, Deletions2) ->
+ dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
+ Deletions1, Deletions2).
+
+merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
+ {anything_but(undefined, X1, X2),
+ anything_but(not_deleted, Deleted1, Deleted2),
+ [Bindings1 | Bindings2]}.
+
+-spec process_deletions(deletions(), rabbit_types:username()) -> rabbit_misc:thunk('ok').
+
+process_deletions(Deletions, ActingUser) ->
+ AugmentedDeletions =
+ dict:map(fun (_XName, {X, deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, delete, Bs),
+ {X, deleted, Bs, none};
+ (_XName, {X, not_deleted, Bindings}) ->
+ Bs = lists:flatten(Bindings),
+ x_callback(transaction, X, remove_bindings, Bs),
+ {X, not_deleted, Bs, rabbit_exchange:serial(X)}
+ end, Deletions),
+ fun() ->
+ dict:fold(fun (XName, {X, deleted, Bs, Serial}, ok) ->
+ ok = rabbit_event:notify(
+ exchange_deleted,
+ [{name, XName},
+ {user_who_performed_action, ActingUser}]),
+ del_notify(Bs, ActingUser),
+ x_callback(Serial, X, delete, Bs);
+ (_XName, {X, not_deleted, Bs, Serial}, ok) ->
+ del_notify(Bs, ActingUser),
+ x_callback(Serial, X, remove_bindings, Bs)
+ end, ok, AugmentedDeletions)
+ end.
+
+del_notify(Bs, ActingUser) -> [rabbit_event:notify(
+ binding_deleted,
+ info(B) ++ [{user_who_performed_action, ActingUser}])
+ || B <- Bs].
+
+x_callback(Serial, X, F, Bs) ->
+ ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).