summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation/test
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_federation/test')
-rw-r--r--deps/rabbitmq_federation/test/exchange_SUITE.erl1319
-rw-r--r--deps/rabbitmq_federation/test/federation_status_command_SUITE.erl168
-rw-r--r--deps/rabbitmq_federation/test/queue_SUITE.erl328
-rw-r--r--deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl129
-rw-r--r--deps/rabbitmq_federation/test/rabbit_federation_test_util.erl354
-rw-r--r--deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl101
-rw-r--r--deps/rabbitmq_federation/test/unit_SUITE.erl65
-rw-r--r--deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl230
8 files changed, 2694 insertions, 0 deletions
diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl
new file mode 100644
index 0000000000..a0cd51c7c9
--- /dev/null
+++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl
@@ -0,0 +1,1319 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(exchange_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-include("rabbit_federation.hrl").
+
+-compile(export_all).
+
+-import(rabbit_federation_test_util,
+ [wait_for_federation/2, expect/3, expect/4, expect_empty/2,
+ set_upstream/4, set_upstream/5, set_upstream_in_vhost/5, set_upstream_in_vhost/6,
+ clear_upstream/3, set_upstream_set/4,
+ set_policy/5, set_policy_pattern/5, clear_policy/3,
+ set_policy_upstream/5, set_policy_upstreams/4,
+ all_federation_links/2, federation_links_in_vhost/3, status_fields/2]).
+
+-import(rabbit_ct_broker_helpers,
+ [set_policy_in_vhost/7]).
+
+all() ->
+ [
+ {group, without_automatic_setup},
+ {group, without_disambiguate},
+ {group, with_disambiguate}
+ ].
+
+groups() ->
+ [
+ {without_automatic_setup, [], [
+ message_cycle_detection_case2
+ ]},
+ {without_disambiguate, [], [
+ {cluster_size_1, [], [
+ simple,
+ multiple_upstreams,
+ multiple_upstreams_pattern,
+ multiple_uris,
+ multiple_downstreams,
+ e2e,
+ unbind_on_delete,
+ unbind_on_unbind,
+ unbind_gets_transmitted,
+ no_loop,
+ dynamic_reconfiguration,
+ dynamic_reconfiguration_integrity,
+ federate_unfederate,
+ dynamic_plugin_stop_start,
+ dynamic_plugin_cleanup_stop_start,
+ dynamic_policy_cleanup,
+ delete_federated_exchange_upstream,
+ delete_federated_queue_upstream
+ ]}
+ ]},
+ {with_disambiguate, [], [
+ {cluster_size_2, [], [
+ user_id,
+ message_cycle_detection_case1,
+ restart_upstream
+ ]},
+ {cluster_size_3, [], [
+ max_hops,
+ binding_propagation
+ ]},
+
+ {without_plugins, [], [
+ {cluster_size_2, [], [
+ upstream_has_no_federation
+ ]}
+ ]}
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {minutes, 5}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(without_automatic_setup, Config) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_nodes_clustered, false},
+ {rmq_nodes_count, 1}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_group(without_disambiguate, Config) ->
+ rabbit_ct_helpers:set_config(Config,
+ {disambiguate_step, []});
+init_per_group(with_disambiguate, Config) ->
+ rabbit_ct_helpers:set_config(Config,
+ {disambiguate_step, [fun rabbit_federation_test_util:disambiguate/1]});
+init_per_group(without_plugins, Config) ->
+ rabbit_ct_helpers:set_config(Config,
+ {broker_with_plugins, [true, false]});
+init_per_group(cluster_size_1 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 1}
+ ]),
+ init_per_group1(Group, Config1);
+init_per_group(cluster_size_2 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2}
+ ]),
+ init_per_group1(Group, Config1);
+init_per_group(cluster_size_3 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3}
+ ]),
+ init_per_group1(Group, Config1).
+
+init_per_group1(Group, Config) ->
+ SetupFederation = case Group of
+ cluster_size_1 -> [fun rabbit_federation_test_util:setup_federation/1];
+ cluster_size_2 -> [];
+ cluster_size_3 -> []
+ end,
+ Disambiguate = ?config(disambiguate_step, Config),
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_nodes_clustered, false}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ SetupFederation ++ Disambiguate).
+
+end_per_group(without_disambiguate, Config) ->
+ Config;
+end_per_group(with_disambiguate, Config) ->
+ Config;
+end_per_group(without_plugins, Config) ->
+ Config;
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()
+ ).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+simple(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>),
+ publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO">>)
+ end, upstream_downstream()).
+
+multiple_upstreams(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q = bind_queue(Ch, <<"fed12.downstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream2">>, <<"key">>),
+ publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>),
+ publish_expect(Ch, <<"upstream2">>, <<"key">>, Q, <<"HELLO2">>)
+ end, [x(<<"upstream">>),
+ x(<<"upstream2">>),
+ x(<<"fed12.downstream">>)]).
+
+multiple_upstreams_pattern(Config) ->
+ set_upstream(Config, 0, <<"local453x">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 0), [
+ {<<"exchange">>, <<"upstream">>},
+ {<<"queue">>, <<"upstream">>}]),
+
+ set_upstream(Config, 0, <<"local3214x">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 0), [
+ {<<"exchange">>, <<"upstream2">>},
+ {<<"queue">>, <<"upstream2">>}]),
+
+ set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>),
+
+ with_ch(Config,
+ fun (Ch) ->
+ Q = bind_queue(Ch, <<"pattern.downstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream2">>, <<"key">>),
+ publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>),
+ publish_expect(Ch, <<"upstream2">>, <<"key">>, Q, <<"HELLO2">>)
+ end, [x(<<"upstream">>),
+ x(<<"upstream2">>),
+ x(<<"pattern.downstream">>)]),
+
+ clear_upstream(Config, 0, <<"local453x">>),
+ clear_upstream(Config, 0, <<"local3214x">>),
+ clear_policy(Config, 0, <<"pattern">>).
+
+multiple_uris(Config) ->
+ %% We can't use a direct connection for Kill() to work.
+ URIs = [
+ rabbit_ct_broker_helpers:node_uri(Config, 0),
+ rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr])
+ ],
+ set_upstream(Config, 0, <<"localhost">>, URIs),
+ WithCh = fun(F) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ F(Ch),
+ rabbit_ct_client_helpers:close_channels_and_connection(
+ Config, 0)
+ end,
+ WithCh(fun (Ch) -> declare_all(Ch, upstream_downstream()) end),
+ expect_uris(Config, 0, URIs),
+ WithCh(fun (Ch) -> delete_all(Ch, upstream_downstream()) end),
+ %% Put back how it was
+ rabbit_federation_test_util:setup_federation(Config),
+ ok.
+
+expect_uris(_, _, []) ->
+ ok;
+expect_uris(Config, Node, URIs) ->
+ [Link] = rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_federation_status, status, []),
+ URI = rabbit_misc:pget(uri, Link),
+ kill_only_connection(Config, Node),
+ expect_uris(Config, Node, URIs -- [URI]).
+
+kill_only_connection(Config, Node) ->
+ case connection_pids(Config, Node) of
+ [Pid] -> catch rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_networking, close_connection, [Pid, "boom"]), %% [1]
+ wait_for_pid_to_die(Config, Node, Pid);
+ _ -> timer:sleep(100),
+ kill_only_connection(Config, Node)
+ end.
+
+%% [1] the catch is because we could still see a connection from a
+%% previous time round. If so that's fine (we'll just loop around
+%% again) but we don't want the test to fail because a connection
+%% closed as we were trying to close it.
+
+wait_for_pid_to_die(Config, Node, Pid) ->
+ case connection_pids(Config, Node) of
+ [Pid] -> timer:sleep(100),
+ wait_for_pid_to_die(Config, Node, Pid);
+ _ -> ok
+ end.
+
+
+multiple_downstreams(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
+ Q12 = bind_queue(Ch, <<"fed12.downstream2">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>, 2),
+ await_binding(Config, 0, <<"upstream2">>, <<"key">>),
+ publish(Ch, <<"upstream">>, <<"key">>, <<"HELLO1">>),
+ publish(Ch, <<"upstream2">>, <<"key">>, <<"HELLO2">>),
+ expect(Ch, Q1, [<<"HELLO1">>]),
+ expect(Ch, Q12, [<<"HELLO1">>, <<"HELLO2">>])
+ end, upstream_downstream() ++
+ [x(<<"upstream2">>),
+ x(<<"fed12.downstream2">>)]).
+
+e2e(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ bind_exchange(Ch, <<"downstream2">>, <<"fed.downstream">>,
+ <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>),
+ Q = bind_queue(Ch, <<"downstream2">>, <<"key">>),
+ publish_expect(Ch, <<"upstream">>, <<"key">>, Q, <<"HELLO1">>)
+ end, upstream_downstream() ++ [x(<<"downstream2">>)]).
+
+unbind_on_delete(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
+ Q2 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>),
+ delete_queue(Ch, Q2),
+ publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>)
+ end, upstream_downstream()).
+
+unbind_on_unbind(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q1 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
+ Q2 = bind_queue(Ch, <<"fed.downstream">>, <<"key">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key">>),
+ unbind_queue(Ch, Q2, <<"fed.downstream">>, <<"key">>),
+ publish_expect(Ch, <<"upstream">>, <<"key">>, Q1, <<"HELLO">>),
+ delete_queue(Ch, Q2)
+ end, upstream_downstream()).
+
+user_id(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_policy_upstream(Config, Rabbit, <<"^test$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 1), []),
+ Perm = fun (F, A) ->
+ ok = rpc:call(Hare,
+ rabbit_auth_backend_internal, F, A)
+ end,
+ Perm(add_user, [<<"hare-user">>, <<"hare-user">>, <<"acting-user">>]),
+ Perm(set_permissions, [<<"hare-user">>,
+ <<"/">>, <<".*">>, <<".*">>, <<".*">>,
+ <<"acting-user">>]),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ {ok, Conn2} = amqp_connection:start(
+ #amqp_params_network{
+ username = <<"hare-user">>,
+ password = <<"hare-user">>,
+ port = rabbit_ct_broker_helpers:get_node_config(Config, Hare,
+ tcp_port_amqp)}),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ declare_exchange(Ch2, x(<<"test">>)),
+ declare_exchange(Ch, x(<<"test">>)),
+ Q = bind_queue(Ch, <<"test">>, <<"key">>),
+ await_binding(Config, Hare, <<"test">>, <<"key">>),
+
+ Msg = #amqp_msg{props = #'P_basic'{user_id = <<"hare-user">>},
+ payload = <<"HELLO">>},
+
+ SafeUri = fun (H) ->
+ {array, [{table, Recv}]} =
+ rabbit_misc:table_lookup(
+ H, <<"x-received-from">>),
+ URI = rabbit_ct_broker_helpers:node_uri(Config, 1),
+ {longstr, URI} =
+ rabbit_misc:table_lookup(Recv, <<"uri">>)
+ end,
+ ExpectUser =
+ fun (ExpUser) ->
+ fun () ->
+ receive
+ {#'basic.deliver'{},
+ #amqp_msg{props = Props,
+ payload = Payload}} ->
+ #'P_basic'{user_id = ActUser,
+ headers = Headers} = Props,
+ SafeUri(Headers),
+ <<"HELLO">> = Payload,
+ ExpUser = ActUser
+ end
+ end
+ end,
+
+ wait_for_federation(
+ 90,
+ fun() ->
+ VHost = <<"/">>,
+ X1s = rabbit_ct_broker_helpers:rpc(
+ Config, Rabbit, rabbit_exchange, list, [VHost]),
+ L1 =
+ [X || X <- X1s,
+ X#exchange.name =:= #resource{virtual_host = VHost,
+ kind = exchange,
+ name = <<"test">>},
+ X#exchange.scratches =:= [{federation,
+ [{{<<"upstream-2">>,
+ <<"test">>},
+ <<"B">>}]}]],
+ X2s = rabbit_ct_broker_helpers:rpc(
+ Config, Hare, rabbit_exchange, list, [VHost]),
+ L2 =
+ [X || X <- X2s,
+ X#exchange.type =:= 'x-federation-upstream'],
+ [] =/= L1 andalso [] =/= L2 andalso
+ has_internal_federated_queue(Config, Hare, VHost)
+ end),
+ publish(Ch2, <<"test">>, <<"key">>, Msg),
+ expect(Ch, Q, ExpectUser(undefined)),
+
+ set_policy_upstream(Config, Rabbit, <<"^test$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 1),
+ [{<<"trust-user-id">>, true}]),
+ wait_for_federation(
+ 90,
+ fun() ->
+ VHost = <<"/">>,
+ X1s = rabbit_ct_broker_helpers:rpc(
+ Config, Rabbit, rabbit_exchange, list, [VHost]),
+ L1 =
+ [X || X <- X1s,
+ X#exchange.name =:= #resource{virtual_host = VHost,
+ kind = exchange,
+ name = <<"test">>},
+ X#exchange.scratches =:= [{federation,
+ [{{<<"upstream-2">>,
+ <<"test">>},
+ <<"A">>}]}]],
+ X2s = rabbit_ct_broker_helpers:rpc(
+ Config, Hare, rabbit_exchange, list, [VHost]),
+ L2 =
+ [X || X <- X2s,
+ X#exchange.type =:= 'x-federation-upstream'],
+ [] =/= L1 andalso [] =/= L2 andalso
+ has_internal_federated_queue(Config, Hare, VHost)
+ end),
+ publish(Ch2, <<"test">>, <<"key">>, Msg),
+ expect(Ch, Q, ExpectUser(<<"hare-user">>)),
+
+ amqp_channel:close(Ch2),
+ amqp_connection:close(Conn2),
+
+ ok.
+
+%% In order to test that unbinds get sent we deliberately set up a
+%% broken config - with topic upstream and fanout downstream. You
+%% shouldn't really do this, but it lets us see "extra" messages that
+%% get sent.
+unbind_gets_transmitted(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q11 = bind_queue(Ch, <<"fed.downstream">>, <<"key1">>),
+ Q12 = bind_queue(Ch, <<"fed.downstream">>, <<"key1">>),
+ Q21 = bind_queue(Ch, <<"fed.downstream">>, <<"key2">>),
+ Q22 = bind_queue(Ch, <<"fed.downstream">>, <<"key2">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key1">>),
+ await_binding(Config, 0, <<"upstream">>, <<"key2">>),
+ [delete_queue(Ch, Q) || Q <- [Q12, Q21, Q22]],
+ publish(Ch, <<"upstream">>, <<"key1">>, <<"YES">>),
+ publish(Ch, <<"upstream">>, <<"key2">>, <<"NO">>),
+ expect(Ch, Q11, [<<"YES">>]),
+ expect_empty(Ch, Q11)
+ end, [x(<<"upstream">>),
+ x(<<"fed.downstream">>)]).
+
+no_loop(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ Q1 = bind_queue(Ch, <<"one">>, <<"key">>),
+ Q2 = bind_queue(Ch, <<"two">>, <<"key">>),
+ await_binding(Config, 0, <<"one">>, <<"key">>, 2),
+ await_binding(Config, 0, <<"two">>, <<"key">>, 2),
+ publish(Ch, <<"one">>, <<"key">>, <<"Hello from one">>),
+ publish(Ch, <<"two">>, <<"key">>, <<"Hello from two">>),
+ expect(Ch, Q1, [<<"Hello from one">>, <<"Hello from two">>]),
+ expect(Ch, Q2, [<<"Hello from one">>, <<"Hello from two">>]),
+ expect_empty(Ch, Q1),
+ expect_empty(Ch, Q2)
+ end, [x(<<"one">>),
+ x(<<"two">>)]).
+
+suffix(Config, Node, Name, XName) ->
+ rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_federation_db, get_active_suffix,
+ [xr(<<"fed.downstream">>),
+ #upstream{name = Name,
+ exchange_name = list_to_binary(XName)}, none]).
+
+restart_upstream(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare),
+
+ rabbit_federation_test_util:set_upstream(Config,
+ Rabbit, <<"hare">>, rabbit_ct_broker_helpers:node_uri(Config, 1)),
+ rabbit_federation_test_util:set_upstream_set(Config,
+ Rabbit, <<"upstream">>,
+ [{<<"hare">>, [{<<"exchange">>, <<"upstream">>}]}]),
+ rabbit_federation_test_util:set_policy(Config,
+ Rabbit, <<"hare">>, <<"^hare\\.">>, <<"upstream">>),
+
+ declare_exchange(Upstream, x(<<"upstream">>)),
+ declare_exchange(Downstream, x(<<"hare.downstream">>)),
+
+ Qstays = bind_queue(Downstream, <<"hare.downstream">>, <<"stays">>),
+ Qgoes = bind_queue(Downstream, <<"hare.downstream">>, <<"goes">>),
+
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, Hare),
+ rabbit_ct_broker_helpers:stop_node(Config, Hare),
+
+ Qcomes = bind_queue(Downstream, <<"hare.downstream">>, <<"comes">>),
+ unbind_queue(Downstream, Qgoes, <<"hare.downstream">>, <<"goes">>),
+
+ rabbit_ct_broker_helpers:start_node(Config, Hare),
+ Upstream1 = rabbit_ct_client_helpers:open_channel(Config, Hare),
+
+ %% Wait for the link to come up and for these bindings
+ %% to be transferred
+ await_binding(Config, Hare, <<"upstream">>, <<"comes">>, 1),
+ await_binding_absent(Config, Hare, <<"upstream">>, <<"goes">>),
+ await_binding(Config, Hare, <<"upstream">>, <<"stays">>, 1),
+
+ publish(Upstream1, <<"upstream">>, <<"goes">>, <<"GOES">>),
+ publish(Upstream1, <<"upstream">>, <<"stays">>, <<"STAYS">>),
+ publish(Upstream1, <<"upstream">>, <<"comes">>, <<"COMES">>),
+
+ expect(Downstream, Qstays, [<<"STAYS">>]),
+ expect(Downstream, Qcomes, [<<"COMES">>]),
+ expect_empty(Downstream, Qgoes),
+
+ delete_exchange(Downstream, <<"hare.downstream">>),
+ delete_exchange(Upstream1, <<"upstream">>),
+
+ rabbit_federation_test_util:clear_policy(Config,
+ Rabbit, <<"hare">>),
+ rabbit_federation_test_util:clear_upstream_set(Config,
+ Rabbit, <<"upstream">>),
+ rabbit_federation_test_util:clear_upstream(Config,
+ Rabbit, <<"hare">>),
+ ok.
+
+%% flopsy, mopsy and cottontail, connected in a ring with max_hops = 2
+%% for each connection. We should not see any duplicates.
+
+max_hops(Config) ->
+ [Flopsy, Mopsy, Cottontail] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ [set_policy_upstream(Config, Downstream,
+ <<"^ring$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, Upstream),
+ [{<<"max-hops">>, 2}])
+ || {Downstream, Upstream} <- [{Flopsy, Cottontail},
+ {Mopsy, Flopsy},
+ {Cottontail, Mopsy}]],
+
+ FlopsyCh = rabbit_ct_client_helpers:open_channel(Config, Flopsy),
+ MopsyCh = rabbit_ct_client_helpers:open_channel(Config, Mopsy),
+ CottontailCh = rabbit_ct_client_helpers:open_channel(Config, Cottontail),
+
+ declare_exchange(FlopsyCh, x(<<"ring">>)),
+ declare_exchange(MopsyCh, x(<<"ring">>)),
+ declare_exchange(CottontailCh, x(<<"ring">>)),
+
+ Q1 = bind_queue(FlopsyCh, <<"ring">>, <<"key">>),
+ Q2 = bind_queue(MopsyCh, <<"ring">>, <<"key">>),
+ Q3 = bind_queue(CottontailCh, <<"ring">>, <<"key">>),
+
+ await_binding(Config, Flopsy, <<"ring">>, <<"key">>, 3),
+ await_binding(Config, Mopsy, <<"ring">>, <<"key">>, 3),
+ await_binding(Config, Cottontail, <<"ring">>, <<"key">>, 3),
+
+ publish(FlopsyCh, <<"ring">>, <<"key">>, <<"HELLO flopsy">>),
+ publish(MopsyCh, <<"ring">>, <<"key">>, <<"HELLO mopsy">>),
+ publish(CottontailCh, <<"ring">>, <<"key">>, <<"HELLO cottontail">>),
+
+ Msgs = [<<"HELLO flopsy">>, <<"HELLO mopsy">>, <<"HELLO cottontail">>],
+ expect(FlopsyCh, Q1, Msgs),
+ expect(MopsyCh, Q2, Msgs),
+ expect(CottontailCh, Q3, Msgs),
+ expect_empty(FlopsyCh, Q1),
+ expect_empty(MopsyCh, Q2),
+ expect_empty(CottontailCh, Q3),
+ ok.
+
+%% Two nodes, federated two way with the same virtual hosts, and max_hops set to a
+%% high value.
+message_cycle_detection_case1(Config) ->
+ [Cycle1, Cycle2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [set_policy_upstream(Config, Downstream,
+ <<"^cycle$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, Upstream),
+ [{<<"max-hops">>, 10}])
+ || {Downstream, Upstream} <- [{Cycle1, Cycle2}, {Cycle2, Cycle1}]],
+
+ Cycle1Ch = rabbit_ct_client_helpers:open_channel(Config, Cycle1),
+ Cycle2Ch = rabbit_ct_client_helpers:open_channel(Config, Cycle2),
+
+ declare_exchange(Cycle1Ch, x(<<"cycle">>)),
+ declare_exchange(Cycle2Ch, x(<<"cycle">>)),
+
+ Q1 = bind_queue(Cycle1Ch, <<"cycle">>, <<"cycle_detection-key">>),
+ Q2 = bind_queue(Cycle2Ch, <<"cycle">>, <<"cycle_detection-key">>),
+
+ %% "key" present twice because once for the local queue and once
+ %% for federation in each case
+ await_binding(Config, Cycle1, <<"cycle">>, <<"cycle_detection-key">>, 2),
+ await_binding(Config, Cycle2, <<"cycle">>, <<"cycle_detection-key">>, 2),
+
+ publish(Cycle1Ch, <<"cycle">>, <<"cycle_detection-key">>, <<"HELLO1">>),
+ publish(Cycle2Ch, <<"cycle">>, <<"cycle_detection-key">>, <<"HELLO2">>),
+
+ Msgs = [<<"HELLO1">>, <<"HELLO2">>],
+ expect(Cycle1Ch, Q1, Msgs),
+ expect(Cycle2Ch, Q2, Msgs),
+ expect_empty(Cycle1Ch, Q1),
+ expect_empty(Cycle2Ch, Q2),
+
+ ok.
+
+node_uri_with_virtual_host(Config, Vhost) ->
+ node_uri_with_virtual_host(Config, 0, Vhost).
+
+node_uri_with_virtual_host(Config, Node, Vhost) ->
+ NodeURI = rabbit_ct_broker_helpers:node_uri(Config, Node),
+ <<NodeURI/binary, "/", Vhost/binary>>.
+
+upstream_policy_defs(Upstream) ->
+ maps:to_list(#{<<"federation-upstream">> => Upstream}).
+
+%% Exchange federation between three local virtual hosts, A -> B -> C,
+%% propagates messages from A to C with a high enough max-hops value
+message_cycle_detection_case2(Config) ->
+ VH1 = <<"cycles.a">>,
+ VH2 = <<"cycles.b">>,
+ VH3 = <<"cycles.c">>,
+ [begin
+ rabbit_ct_broker_helpers:add_vhost(Config, V),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, V)
+ end || V <- [VH1, VH2, VH3]],
+
+ %% make sure that cycle detection does not drop messages because of a limit on hops
+ UpstreamOpts = [{<<"max-hops">>, 5}],
+ %% VH1 is an upstream for VH2
+ %% VH2 is an upstream for VH3
+ UpstreamA = <<"upstream_a">>,
+ URI1 = node_uri_with_virtual_host(Config, VH1),
+ set_upstream_in_vhost(Config, 0, VH2, UpstreamA, URI1, UpstreamOpts),
+ UpstreamB = <<"upstream_b">>,
+ URI2 = node_uri_with_virtual_host(Config, VH2),
+ set_upstream_in_vhost(Config, 0, VH3, UpstreamB, URI2, UpstreamOpts),
+
+ %% policies
+ set_policy_in_vhost(Config, 0, VH2, <<"federate.x">>, <<"^federated">>, <<"exchanges">>, upstream_policy_defs(UpstreamA)),
+ set_policy_in_vhost(Config, 0, VH3, <<"federate.x">>, <<"^federated">>, <<"exchanges">>, upstream_policy_defs(UpstreamB)),
+
+ %% channels
+ VH1Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH1),
+ VH2Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH2),
+ VH3Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH3),
+
+ {ok, VH1Ch} = amqp_connection:open_channel(VH1Conn),
+ {ok, VH2Ch} = amqp_connection:open_channel(VH2Conn),
+ {ok, VH3Ch} = amqp_connection:open_channel(VH3Conn),
+
+ X = <<"federated.x">>,
+ declare_exchange(VH3Ch, x(X, <<"fanout">>)),
+ declare_exchange(VH2Ch, x(X, <<"fanout">>)),
+ declare_exchange(VH1Ch, x(X, <<"fanout">>)),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ LinksInB = federation_links_in_vhost(Config, 0, VH2),
+ LinksInC = federation_links_in_vhost(Config, 0, VH3),
+ length(LinksInB) =:= 1 andalso
+ length(LinksInC) =:= 1 andalso
+ [running] =:= status_fields(status, LinksInB ++ LinksInC)
+ end),
+
+ Statuses = federation_links_in_vhost(Config, 0, VH2) ++ federation_links_in_vhost(Config, 0, VH3),
+
+ ?assertEqual(lists:usort([URI1, URI2]),
+ status_fields(uri, Statuses)),
+ ?assertEqual(lists:usort([<<"federated.x">>]),
+ status_fields(upstream_exchange, Statuses)),
+ ?assertEqual(lists:usort([VH2, VH3]),
+ status_fields(vhost, Statuses)),
+ ?assertEqual(lists:usort([exchange]),
+ status_fields(type, Statuses)),
+
+ %% give links some time to set up their topology
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ ExchangesInA = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [VH1]),
+ ExchangesInB = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [VH2]),
+ length(ExchangesInA) >= 1 andalso
+ length(ExchangesInB) >= 1
+ end),
+
+ RK = <<"doesn't matter">>,
+ Q = bind_queue(VH3Ch, X, RK),
+ ?assertEqual(ok, await_binding(Config, 0, VH2, X, RK, 1)),
+ ?assertEqual(ok, await_binding(Config, 0, VH3, X, RK, 1)),
+ timer:sleep(2000),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ ExchangesInA = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_exchange, list, [VH1]),
+ lists:any(fun(#exchange{name = XName}) ->
+ XName =:= rabbit_misc:r(VH1, exchange, X)
+ end, ExchangesInA)
+ end),
+
+ Payload1 = <<"msg1">>,
+ Payload2 = <<"msg2">>,
+ publish(VH1Ch, X, RK, Payload1),
+ publish(VH1Ch, X, RK, Payload2),
+
+ Msgs = [Payload1, Payload2],
+ %% payloads published to a federated exchange in A reach a queue in C
+ expect(VH3Ch, Q, Msgs, 10000),
+
+ [amqp_connection:close(Conn) || Conn <- [VH1Conn, VH2Conn, VH3Conn]],
+ [rabbit_ct_broker_helpers:delete_vhost(Config, Vhost) || Vhost <- [VH1, VH2, VH3]],
+ ok.
+
+%% Arrows indicate message flow. Numbers indicate max_hops.
+%%
+%% Dylan ---1--> Bugs ---2--> Jessica
+%% |^ |^
+%% |\--------------1---------------/|
+%% \---------------1----------------/
+%%
+%%
+%% We want to demonstrate that if we bind a queue locally at each
+%% broker, (exactly) the following bindings propagate:
+%%
+%% Bugs binds to Dylan
+%% Jessica binds to Bugs, which then propagates on to Dylan
+%% Jessica binds to Dylan directly
+%% Dylan binds to Jessica.
+%%
+%% i.e. Dylan has two bindings from Jessica and one from Bugs
+%% Bugs has one binding from Jessica
+%% Jessica has one binding from Dylan
+%%
+%% So we tag each binding with its original broker and see how far it gets
+%%
+%% Also we check that when we tear down the original bindings
+%% that we get rid of everything again.
+
+binding_propagation(Config) ->
+ [Dylan, Bugs, Jessica] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ set_policy_upstream(Config, Dylan, <<"^x$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, Jessica), []),
+ set_policy_upstream(Config, Bugs, <<"^x$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, Dylan), []),
+ set_policy_upstreams(Config, Jessica, <<"^x$">>, [
+ {rabbit_ct_broker_helpers:node_uri(Config, Dylan), []},
+ {rabbit_ct_broker_helpers:node_uri(Config, Bugs),
+ [{<<"max-hops">>, 2}]}
+ ]),
+ DylanCh = rabbit_ct_client_helpers:open_channel(Config, Dylan),
+ BugsCh = rabbit_ct_client_helpers:open_channel(Config, Bugs),
+ JessicaCh = rabbit_ct_client_helpers:open_channel(Config, Jessica),
+
+ declare_exchange(DylanCh, x(<<"x">>)),
+ declare_exchange(BugsCh, x(<<"x">>)),
+ declare_exchange(JessicaCh, x(<<"x">>)),
+
+ Q1 = bind_queue(DylanCh, <<"x">>, <<"dylan">>),
+ Q2 = bind_queue(BugsCh, <<"x">>, <<"bugs">>),
+ Q3 = bind_queue(JessicaCh, <<"x">>, <<"jessica">>),
+
+ await_binding(Config, Dylan, <<"x">>, <<"jessica">>, 2),
+ await_bindings(Config, Dylan, <<"x">>, [<<"bugs">>, <<"dylan">>]),
+ await_bindings(Config, Bugs, <<"x">>, [<<"jessica">>, <<"bugs">>]),
+ await_bindings(Config, Jessica, <<"x">>, [<<"dylan">>, <<"jessica">>]),
+
+ delete_queue(DylanCh, Q1),
+ delete_queue(BugsCh, Q2),
+ delete_queue(JessicaCh, Q3),
+
+ await_bindings(Config, Dylan, <<"x">>, []),
+ await_bindings(Config, Bugs, <<"x">>, []),
+ await_bindings(Config, Jessica, <<"x">>, []),
+
+ ok.
+
+upstream_has_no_federation(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_policy_upstream(Config, Rabbit, <<"^test$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, Hare), []),
+ Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare),
+ declare_exchange(Upstream, x(<<"test">>)),
+ declare_exchange(Downstream, x(<<"test">>)),
+ Q = bind_queue(Downstream, <<"test">>, <<"routing">>),
+ await_binding(Config, Hare, <<"test">>, <<"routing">>),
+ publish(Upstream, <<"test">>, <<"routing">>, <<"HELLO">>),
+ expect(Downstream, Q, [<<"HELLO">>]),
+ ok.
+
+dynamic_reconfiguration(Config) ->
+ with_ch(Config,
+ fun (_Ch) ->
+ Xs = [<<"all.fed1">>, <<"all.fed2">>],
+ %% Left from the conf we set up for previous tests
+ assert_connections(Config, 0, Xs, [<<"localhost">>, <<"local5673">>]),
+
+ %% Test that clearing connections works
+ clear_upstream(Config, 0, <<"localhost">>),
+ clear_upstream(Config, 0, <<"local5673">>),
+ assert_connections(Config, 0, Xs, []),
+
+ %% Test that readding them and changing them works
+ set_upstream(Config, 0,
+ <<"localhost">>, rabbit_ct_broker_helpers:node_uri(Config, 0)),
+ %% Do it twice so we at least hit the no-restart optimisation
+ URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]),
+ set_upstream(Config, 0, <<"localhost">>, URI),
+ set_upstream(Config, 0, <<"localhost">>, URI),
+ assert_connections(Config, 0, Xs, [<<"localhost">>]),
+
+ %% And re-add the last - for next test
+ rabbit_federation_test_util:setup_federation(Config)
+ end, [x(<<"all.fed1">>), x(<<"all.fed2">>)]).
+
+dynamic_reconfiguration_integrity(Config) ->
+ with_ch(Config,
+ fun (_Ch) ->
+ Xs = [<<"new.fed1">>, <<"new.fed2">>],
+
+ %% Declared exchanges with nonexistent set - no links
+ assert_connections(Config, 0, Xs, []),
+
+ %% Create the set - links appear
+ set_upstream_set(Config, 0, <<"new-set">>, [{<<"localhost">>, []}]),
+ assert_connections(Config, 0, Xs, [<<"localhost">>]),
+
+ %% Add nonexistent connections to set - nothing breaks
+ set_upstream_set(Config, 0,
+ <<"new-set">>, [{<<"localhost">>, []},
+ {<<"does-not-exist">>, []}]),
+ assert_connections(Config, 0, Xs, [<<"localhost">>]),
+
+ %% Change connection in set - links change
+ set_upstream_set(Config, 0, <<"new-set">>, [{<<"local5673">>, []}]),
+ assert_connections(Config, 0, Xs, [<<"local5673">>])
+ end, [x(<<"new.fed1">>), x(<<"new.fed2">>)]).
+
+delete_federated_exchange_upstream(Config) ->
+ %% If two exchanges in different virtual hosts have the same name, only one should be deleted.
+ VH1 = <<"federation-downstream1">>,
+ rabbit_ct_broker_helpers:delete_vhost(Config, VH1),
+ rabbit_ct_broker_helpers:add_vhost(Config, VH1),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH1),
+ VH2 = <<"federation-downstream2">>,
+ rabbit_ct_broker_helpers:delete_vhost(Config, VH2),
+ rabbit_ct_broker_helpers:add_vhost(Config, VH2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH2),
+
+ Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH1),
+ Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH2),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ #'exchange.declare_ok'{} = declare_exchange(Ch1, #'exchange.declare'{exchange = <<"federated.topic">>,
+ type = <<"topic">>,
+ durable = true}),
+ #'exchange.declare_ok'{} = declare_exchange(Ch2, #'exchange.declare'{exchange = <<"federated.topic">>,
+ type = <<"topic">>,
+ durable = true}),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_policy, set,
+ [VH1,
+ <<"federation">>, <<"^federated\.">>,
+ [{<<"federation-upstream-set">>, <<"all">>}],
+ 0, <<"exchanges">>, <<"acting-user">>]),
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_policy, set,
+ [VH2,
+ <<"federation">>, <<"^federated\.">>,
+ [{<<"federation-upstream-set">>, <<"all">>}],
+ 0, <<"exchanges">>, <<"acting-user">>]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0, VH1,
+ <<"federation-upstream">>, <<"upstream">>,
+ [{<<"uri">>, node_uri_with_virtual_host(Config, VH2)}]),
+ rabbit_ct_broker_helpers:set_parameter(Config, 0, VH2,
+ <<"federation-upstream">>, <<"upstream">>,
+ [{<<"uri">>, node_uri_with_virtual_host(Config, VH1)}]),
+
+ ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))),
+ ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH2))),
+
+ rabbit_ct_broker_helpers:clear_parameter(Config, 0, VH2,
+ <<"federation-upstream">>, <<"upstream">>),
+
+ %% one link is still around
+ ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))),
+ ?assertEqual(0, length(federation_links_in_vhost(Config, 0, VH2))),
+ LinksInVH1 = federation_links_in_vhost(Config, 0, VH1),
+ ?assertEqual(VH1, proplists:get_value(vhost, hd(LinksInVH1))),
+
+ [rabbit_ct_broker_helpers:delete_vhost(Config, Val) || Val <- [VH1, VH2]].
+
+delete_federated_queue_upstream(Config) ->
+ %% If two queues in different virtual hosts have the same name, only one should be deleted.
+ VH1 = <<"federation-downstream1">>,
+ rabbit_ct_broker_helpers:delete_vhost(Config, VH1),
+ rabbit_ct_broker_helpers:add_vhost(Config, VH1),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH1),
+ VH2 = <<"federation-downstream2">>,
+ rabbit_ct_broker_helpers:delete_vhost(Config, VH2),
+ rabbit_ct_broker_helpers:add_vhost(Config, VH2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VH2),
+
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_policy, set,
+ [VH1,
+ <<"federation">>, <<"^federated\.">>,
+ [{<<"federation-upstream-set">>, <<"all">>}],
+ 0, <<"queues">>, <<"acting-user">>]),
+ rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_policy, set,
+ [VH2,
+ <<"federation">>, <<"^federated\.">>,
+ [{<<"federation-upstream-set">>, <<"all">>}],
+ 0, <<"queues">>, <<"acting-user">>]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0, VH1,
+ <<"federation-upstream">>, <<"upstream">>,
+ [{<<"uri">>, node_uri_with_virtual_host(Config, VH2)}]),
+ rabbit_ct_broker_helpers:set_parameter(Config, 0, VH2,
+ <<"federation-upstream">>, <<"upstream">>,
+ [{<<"uri">>, node_uri_with_virtual_host(Config, VH1)}]),
+
+ Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH1),
+ Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VH2),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ #'queue.declare_ok'{} = declare_queue(Ch1,
+ #'queue.declare'{queue = <<"federated.queue">>,
+ durable = true}),
+ #'queue.declare_ok'{} = declare_queue(Ch2,
+ #'queue.declare'{queue = <<"federated.queue">>,
+ durable = true}),
+
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(federation_links_in_vhost(Config, 0, VH1)) > 0 andalso
+ length(federation_links_in_vhost(Config, 0, VH2)) > 0
+ end),
+
+ ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))),
+ ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH2))),
+
+ rabbit_ct_broker_helpers:clear_parameter(Config, 0, VH2,
+ <<"federation-upstream">>, <<"upstream">>),
+
+ ?assertEqual(1, length(federation_links_in_vhost(Config, 0, VH1))),
+ ?assertEqual(0, length(federation_links_in_vhost(Config, 0, VH2))),
+ LinksInVH1 = federation_links_in_vhost(Config, 0, VH1),
+ ?assertEqual(VH1, proplists:get_value(vhost, hd(LinksInVH1))),
+
+ [rabbit_ct_broker_helpers:delete_vhost(Config, Val) || Val <- [VH1, VH2]].
+
+federate_unfederate(Config) ->
+ with_ch(Config,
+ fun (_Ch) ->
+ Xs = [<<"dyn.exch1">>, <<"dyn.exch2">>],
+
+ %% Declareda non-federated exchanges - no links
+ assert_connections(Config, 0, Xs, []),
+
+ %% Federate them - links appear
+ set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"all">>),
+ assert_connections(Config, 0, Xs, [<<"localhost">>, <<"local5673">>]),
+
+ %% Change policy - links change
+ set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>),
+ assert_connections(Config, 0, Xs, [<<"localhost">>]),
+
+ %% Unfederate them - links disappear
+ clear_policy(Config, 0, <<"dyn">>),
+ assert_connections(Config, 0, Xs, [])
+ end, [x(<<"dyn.exch1">>), x(<<"dyn.exch2">>)]).
+
+dynamic_plugin_stop_start(Config) ->
+ X1 = <<"dyn.exch1">>,
+ X2 = <<"dyn.exch2">>,
+ with_ch(Config,
+ fun (Ch) ->
+ set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>),
+
+ %% Declare federated exchange - get link
+ assert_connections(Config, 0, [X1], [<<"localhost">>]),
+
+ %% Disable plugin, link goes
+ ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0,
+ "rabbitmq_federation"),
+ %% We can't check with status for obvious reasons...
+ undefined = rabbit_ct_broker_helpers:rpc(Config, 0,
+ erlang, whereis, [rabbit_federation_sup]),
+ {error, not_found} = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_registry, lookup_module,
+ [exchange, 'x-federation-upstream']),
+
+ %% Create exchange then re-enable plugin, links appear
+ declare_exchange(Ch, x(X2)),
+ ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0,
+ "rabbitmq_federation"),
+ assert_connections(Config, 0, [X1, X2], [<<"localhost">>]),
+ {ok, _} = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_registry, lookup_module,
+ [exchange, 'x-federation-upstream']),
+
+ wait_for_federation(
+ 90,
+ fun() ->
+ VHost = <<"/">>,
+ Xs = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_exchange, list, [VHost]),
+ L1 =
+ [X || X <- Xs,
+ X#exchange.type =:= 'x-federation-upstream'],
+ L2 =
+ [X || X <- Xs,
+ X#exchange.name =:= #resource{
+ virtual_host = VHost,
+ kind = exchange,
+ name = X1},
+ X#exchange.scratches =:= [{federation,
+ [{{<<"localhost">>,
+ X1},
+ <<"A">>}]}]],
+ L3 =
+ [X || X <- Xs,
+ X#exchange.name =:= #resource{
+ virtual_host = VHost,
+ kind = exchange,
+ name = X2},
+ X#exchange.scratches =:= [{federation,
+ [{{<<"localhost">>,
+ X2},
+ <<"B">>}]}]],
+ length(L1) =:= 2 andalso [] =/= L2 andalso [] =/= L3 andalso
+ has_internal_federated_queue(Config, 0, VHost)
+ end),
+
+ %% Test both exchanges work. They are just federated to
+ %% themselves so should duplicate messages.
+ [begin
+ Q = bind_queue(Ch, X, <<"key">>),
+ await_binding(Config, 0, X, <<"key">>, 2),
+ publish(Ch, X, <<"key">>, <<"HELLO">>),
+ expect(Ch, Q, [<<"HELLO">>, <<"HELLO">>]),
+ delete_queue(Ch, Q)
+ end || X <- [X1, X2]],
+
+ clear_policy(Config, 0, <<"dyn">>),
+ assert_connections(Config, 0, [X1, X2], []),
+ delete_exchange(Ch, X2)
+ end, [x(X1)]).
+
+dynamic_plugin_cleanup_stop_start(Config) ->
+ X1 = <<"dyn.exch1">>,
+ with_ch(Config,
+ fun (_Ch) ->
+ set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>),
+
+ %% Declare a federated exchange, a link starts
+ assert_connections(Config, 0, [X1], [<<"localhost">>]),
+ wait_for_federation(
+ 90,
+ fun() ->
+ VHost = <<"/">>,
+ Xs = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_exchange, list, [VHost]),
+ L1 =
+ [X || X <- Xs,
+ X#exchange.type =:= 'x-federation-upstream'],
+ L2 =
+ [X || X <- Xs,
+ X#exchange.name =:= #resource{
+ virtual_host = VHost,
+ kind = exchange,
+ name = X1},
+ X#exchange.scratches =:= [{federation,
+ [{{<<"localhost">>,
+ X1},
+ <<"B">>}]}]],
+ [] =/= L1 andalso [] =/= L2 andalso
+ has_internal_federated_queue(Config, 0, VHost)
+ end),
+
+ ?assert(has_internal_federated_exchange(Config, 0, <<"/">>)),
+ ?assert(has_internal_federated_queue(Config, 0, <<"/">>)),
+
+ %% Disable plugin, link goes
+ ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0,
+ "rabbitmq_federation"),
+
+ %% Internal exchanges and queues need cleanup
+ ?assert(not has_internal_federated_exchange(Config, 0, <<"/">>)),
+ ?assert(not has_internal_federated_queue(Config, 0, <<"/">>)),
+
+ ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0,
+ "rabbitmq_federation"),
+ clear_policy(Config, 0, <<"dyn">>),
+ assert_connections(Config, 0, [X1], [])
+ end, [x(X1)]).
+
+dynamic_policy_cleanup(Config) ->
+ X1 = <<"dyn.exch1">>,
+ with_ch(Config,
+ fun (_Ch) ->
+ set_policy(Config, 0, <<"dyn">>, <<"^dyn\\.">>, <<"localhost">>),
+
+ %% Declare federated exchange - get link
+ assert_connections(Config, 0, [X1], [<<"localhost">>]),
+ wait_for_federation(
+ 90,
+ fun() ->
+ VHost = <<"/">>,
+ Xs = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_exchange, list, [VHost]),
+ L1 =
+ [X || X <- Xs,
+ X#exchange.type =:= 'x-federation-upstream'],
+ L2 =
+ [X || X <- Xs,
+ X#exchange.name =:= #resource{
+ virtual_host = VHost,
+ kind = exchange,
+ name = X1},
+ X#exchange.scratches =:= [{federation,
+ [{{<<"localhost">>,
+ X1},
+ <<"B">>}]}]],
+ [] =/= L1 andalso [] =/= L2 andalso
+ has_internal_federated_queue(Config, 0, VHost)
+ end),
+
+ ?assert(has_internal_federated_exchange(Config, 0, <<"/">>)),
+ ?assert(has_internal_federated_queue(Config, 0, <<"/">>)),
+
+ clear_policy(Config, 0, <<"dyn">>),
+ timer:sleep(5000),
+
+ %% Internal exchanges and queues need cleanup
+ ?assert(not has_internal_federated_exchange(Config, 0, <<"/">>)),
+ ?assert(not has_internal_federated_queue(Config, 0, <<"/">>)),
+
+ clear_policy(Config, 0, <<"dyn">>),
+ assert_connections(Config, 0, [X1], [])
+ end, [x(X1)]).
+
+has_internal_federated_exchange(Config, Node, VHost) ->
+ lists:any(fun(X) ->
+ X#exchange.type == 'x-federation-upstream'
+ end, rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_exchange, list, [VHost])).
+
+has_internal_federated_queue(Config, Node, VHost) ->
+ lists:any(
+ fun(Q) ->
+ {'longstr', <<"federation">>} ==
+ rabbit_misc:table_lookup(amqqueue:get_arguments(Q), <<"x-internal-purpose">>)
+ end, rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_amqqueue, list, [VHost])).
+
+%%----------------------------------------------------------------------------
+
+with_ch(Config, Fun, Xs) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ declare_all(Ch, Xs),
+ rabbit_federation_test_util:assert_status(Config, 0,
+ Xs, {exchange, upstream_exchange}),
+ Fun(Ch),
+ delete_all(Ch, Xs),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ cleanup(Config, 0),
+ ok.
+
+cleanup(Config, Node) ->
+ [rabbit_ct_broker_helpers:rpc(
+ Config, Node, rabbit_amqqueue, delete, [Q, false, false,
+ <<"acting-user">>]) ||
+ Q <- queues(Config, Node)].
+
+queues(Config, Node) ->
+ Ret = rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_amqqueue, list, [<<"/">>]),
+ case Ret of
+ {badrpc, _} -> [];
+ Qs -> Qs
+ end.
+
+stop_other_node(Config, Node) ->
+ cleanup(Config, Node),
+ rabbit_federation_test_util:stop_other_node(Config, Node).
+
+declare_all(Ch, Xs) -> [declare_exchange(Ch, X) || X <- Xs].
+delete_all(Ch, Xs) ->
+ [delete_exchange(Ch, X) || #'exchange.declare'{exchange = X} <- Xs].
+
+declare_exchange(Ch, X) ->
+ amqp_channel:call(Ch, X).
+
+x(Name) -> x(Name, <<"topic">>).
+
+x(Name, Type) ->
+ #'exchange.declare'{exchange = Name,
+ type = Type,
+ durable = true}.
+
+xr(Name) ->
+ rabbit_misc:r(<<"/">>, exchange, Name).
+
+xr(Vhost, Name) ->
+ rabbit_misc:r(Vhost, exchange, Name).
+
+declare_queue(Ch) ->
+ #'queue.declare_ok'{queue = Q} =
+ amqp_channel:call(Ch, #'queue.declare'{exclusive = true}),
+ Q.
+
+declare_queue(Ch, Q) ->
+ amqp_channel:call(Ch, Q).
+
+bind_queue(Ch, Q, X, Key) ->
+ amqp_channel:call(Ch, #'queue.bind'{queue = Q,
+ exchange = X,
+ routing_key = Key}).
+
+unbind_queue(Ch, Q, X, Key) ->
+ amqp_channel:call(Ch, #'queue.unbind'{queue = Q,
+ exchange = X,
+ routing_key = Key}).
+
+bind_exchange(Ch, D, S, Key) ->
+ amqp_channel:call(Ch, #'exchange.bind'{destination = D,
+ source = S,
+ routing_key = Key}).
+
+bind_queue(Ch, X, Key) ->
+ Q = declare_queue(Ch),
+ bind_queue(Ch, Q, X, Key),
+ Q.
+
+delete_exchange(Ch, X) ->
+ amqp_channel:call(Ch, #'exchange.delete'{exchange = X}).
+
+delete_queue(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
+await_binding(Config, Node, X, Key) ->
+ await_binding(Config, Node, X, Key, 1).
+
+await_binding(Config, Node, X, Key, ExpectedBindingCount) when is_integer(ExpectedBindingCount) ->
+ await_binding(Config, Node, <<"/">>, X, Key, ExpectedBindingCount).
+
+await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount) when is_integer(ExpectedBindingCount) ->
+ Attempts = 100,
+ await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount, Attempts).
+
+await_binding(_Config, _Node, _Vhost, _X, _Key, ExpectedBindingCount, 0) ->
+ {error, rabbit_misc:format("expected ~s bindings but they did not materialize in time", [ExpectedBindingCount])};
+await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount, AttemptsLeft) when is_integer(ExpectedBindingCount) ->
+ case bound_keys_from(Config, Node, Vhost, X, Key) of
+ Bs when length(Bs) < ExpectedBindingCount ->
+ timer:sleep(100),
+ await_binding(Config, Node, Vhost, X, Key, ExpectedBindingCount, AttemptsLeft - 1);
+ Bs when length(Bs) =:= ExpectedBindingCount ->
+ ok;
+ Bs ->
+ {error, rabbit_misc:format("expected ~b bindings, got ~b", [ExpectedBindingCount, length(Bs)])}
+ end.
+
+await_bindings(Config, Node, X, Keys) ->
+ [await_binding(Config, Node, X, Key) || Key <- Keys].
+
+await_binding_absent(Config, Node, X, Key) ->
+ case bound_keys_from(Config, Node, <<"/">>, X, Key) of
+ [] -> ok;
+ _ -> timer:sleep(100),
+ await_binding_absent(Config, Node, X, Key)
+ end.
+
+bound_keys_from(Config, Node, Vhost, X, Key) ->
+ List = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_binding,
+ list_for_source, [xr(Vhost, X)]),
+ [K || #binding{key = K} <- List, K =:= Key].
+
+publish(Ch, X, Key, Payload) when is_binary(Payload) ->
+ publish(Ch, X, Key, #amqp_msg{payload = Payload});
+
+publish(Ch, X, Key, Msg = #amqp_msg{}) ->
+ amqp_channel:call(Ch, #'basic.publish'{exchange = X,
+ routing_key = Key}, Msg).
+
+publish_expect(Ch, X, Key, Q, Payload) ->
+ publish(Ch, X, Key, Payload),
+ expect(Ch, Q, [Payload]).
+
+%%----------------------------------------------------------------------------
+
+assert_connections(Config, Node, Xs, Conns) ->
+ rabbit_ct_broker_helpers:rpc(Config, Node,
+ ?MODULE, assert_connections1, [Xs, Conns]).
+
+assert_connections1(Xs, Conns) ->
+ Links = [{X, C, X} ||
+ X <- Xs,
+ C <- Conns],
+ Remaining = lists:foldl(
+ fun (Link, Status) ->
+ rabbit_federation_test_util:assert_link_status(
+ Link, Status, {exchange, upstream_exchange})
+ end, rabbit_federation_status:status(), Links),
+ [] = Remaining,
+ ok.
+
+connection_pids(Config, Node) ->
+ [P || [{pid, P}] <-
+ rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_networking, connection_info_all, [[pid]])].
+
+upstream_downstream() ->
+ [x(<<"upstream">>), x(<<"fed.downstream">>)].
diff --git a/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl b/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl
new file mode 100644
index 0000000000..b7702bcf02
--- /dev/null
+++ b/deps/rabbitmq_federation/test/federation_status_command_SUITE.erl
@@ -0,0 +1,168 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(federation_status_command_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(CMD, 'Elixir.RabbitMQ.CLI.Ctl.Commands.FederationStatusCommand').
+
+all() ->
+ [
+ {group, not_federated},
+ {group, federated},
+ {group, federated_down}
+ ].
+
+groups() ->
+ [
+ {not_federated, [], [
+ run_not_federated,
+ output_not_federated
+ ]},
+ {federated, [], [
+ run_federated,
+ output_federated
+ ]},
+ {federated_down, [], [
+ run_down_federated
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ Config2 = rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ Config2.
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(federated, Config) ->
+ rabbit_federation_test_util:setup_federation(Config),
+ Config;
+init_per_group(federated_down, Config) ->
+ rabbit_federation_test_util:setup_down_federation(Config),
+ Config;
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+run_not_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ {stream, []} = ?CMD:run([], Opts#{only_down => false}).
+
+output_not_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ {stream, []} = ?CMD:output({stream, []}, Opts).
+
+run_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ %% All
+ rabbit_federation_test_util:with_ch(
+ Config,
+ fun(_) ->
+ timer:sleep(3000),
+ {stream, [Props]} = ?CMD:run([], Opts#{only_down => false}),
+ <<"upstream">> = proplists:get_value(upstream_queue, Props),
+ <<"fed.downstream">> = proplists:get_value(queue, Props),
+ <<"fed.tag">> = proplists:get_value(consumer_tag, Props),
+ running = proplists:get_value(status, Props)
+ end,
+ [rabbit_federation_test_util:q(<<"upstream">>),
+ rabbit_federation_test_util:q(<<"fed.downstream">>)]),
+ %% Down
+ rabbit_federation_test_util:with_ch(
+ Config,
+ fun(_) ->
+ {stream, []} = ?CMD:run([], Opts#{only_down => true})
+ end,
+ [rabbit_federation_test_util:q(<<"upstream">>),
+ rabbit_federation_test_util:q(<<"fed.downstream">>)]).
+
+run_down_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ %% All
+ rabbit_federation_test_util:with_ch(
+ Config,
+ fun(_) ->
+ timer:sleep(3000),
+ {stream, ManyProps} = ?CMD:run([], Opts#{only_down => false}),
+ Links = [{proplists:get_value(upstream, Props),
+ proplists:get_value(status, Props)}
+ || Props <- ManyProps],
+ [{<<"broken-bunny">>, error}, {<<"localhost">>, running}]
+ = lists:sort(Links)
+ end,
+ [rabbit_federation_test_util:q(<<"upstream">>),
+ rabbit_federation_test_util:q(<<"fed.downstream">>)]),
+ %% Down
+ rabbit_federation_test_util:with_ch(
+ Config,
+ fun(_) ->
+ timer:sleep(3000),
+ {stream, [Props]} = ?CMD:run([], Opts#{only_down => true}),
+ <<"broken-bunny">> = proplists:get_value(upstream, Props),
+ error = proplists:get_value(status, Props)
+ end,
+ [rabbit_federation_test_util:q(<<"upstream">>),
+ rabbit_federation_test_util:q(<<"fed.downstream">>)]).
+
+output_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ Input = {stream,[[{queue, <<"fed.downstream">>},
+ {consumer_tag, <<"fed.tag">>},
+ {upstream_queue, <<"upstream">>},
+ {type, queue},
+ {vhost, <<"/">>},
+ {upstream, <<"localhost">>},
+ {status, running},
+ {local_connection, <<"<rmq-ct-federation_status_command_SUITE-1-21000@localhost.1.563.0>">>},
+ {uri, <<"amqp://localhost:21000">>},
+ {timestamp, {{2016,11,21},{8,51,19}}}]]},
+ {stream, [#{queue := <<"fed.downstream">>,
+ upstream_queue := <<"upstream">>,
+ type := queue,
+ vhost := <<"/">>,
+ upstream := <<"localhost">>,
+ status := running,
+ local_connection := <<"<rmq-ct-federation_status_command_SUITE-1-21000@localhost.1.563.0>">>,
+ uri := <<"amqp://localhost:21000">>,
+ last_changed := <<"2016-11-21 08:51:19">>,
+ exchange := <<>>,
+ upstream_exchange := <<>>,
+ error := <<>>}]}
+ = ?CMD:output(Input, Opts).
diff --git a/deps/rabbitmq_federation/test/queue_SUITE.erl b/deps/rabbitmq_federation/test/queue_SUITE.erl
new file mode 100644
index 0000000000..5c3660fb64
--- /dev/null
+++ b/deps/rabbitmq_federation/test/queue_SUITE.erl
@@ -0,0 +1,328 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(queue_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-import(rabbit_federation_test_util,
+ [wait_for_federation/2, expect/3, expect/4,
+ set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3,
+ set_policy_pattern/5, set_policy_upstream/5, q/1, with_ch/3,
+ declare_queue/2, delete_queue/2,
+ federation_links_in_vhost/3]).
+
+-define(INITIAL_WAIT, 6000).
+-define(EXPECT_FEDERATION_TIMEOUT, 30000).
+
+all() ->
+ [
+ {group, without_disambiguate},
+ {group, with_disambiguate}
+ ].
+
+groups() ->
+ [
+ {without_disambiguate, [], [
+ {cluster_size_1, [], [
+ simple,
+ multiple_upstreams,
+ multiple_upstreams_pattern,
+ multiple_downstreams,
+ bidirectional,
+ dynamic_reconfiguration,
+ federate_unfederate,
+ dynamic_plugin_stop_start
+ ]}
+ ]},
+ {with_disambiguate, [], [
+ {cluster_size_2, [], [
+ restart_upstream
+ ]}
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(without_disambiguate, Config) ->
+ rabbit_ct_helpers:set_config(Config,
+ {disambiguate_step, []});
+init_per_group(with_disambiguate, Config) ->
+ rabbit_ct_helpers:set_config(Config,
+ {disambiguate_step, [fun rabbit_federation_test_util:disambiguate/1]});
+init_per_group(cluster_size_1 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 1}
+ ]),
+ init_per_group1(Group, Config1);
+init_per_group(cluster_size_2 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2}
+ ]),
+ init_per_group1(Group, Config1).
+
+init_per_group1(Group, Config) ->
+ SetupFederation = case Group of
+ cluster_size_1 -> [fun rabbit_federation_test_util:setup_federation/1];
+ cluster_size_2 -> []
+ end,
+ Disambiguate = ?config(disambiguate_step, Config),
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Suffix},
+ {rmq_nodes_clustered, false}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ SetupFederation ++ Disambiguate).
+
+end_per_group(without_disambiguate, Config) ->
+ Config;
+end_per_group(with_disambiguate, Config) ->
+ Config;
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+simple(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
+ end, upstream_downstream()).
+
+multiple_upstreams(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>),
+ expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>)
+ end, [q(<<"upstream">>),
+ q(<<"upstream2">>),
+ q(<<"fed12.downstream">>)]).
+
+multiple_upstreams_pattern(Config) ->
+ set_upstream(Config, 0, <<"local453x">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 0), [
+ {<<"exchange">>, <<"upstream">>},
+ {<<"queue">>, <<"upstream">>}]),
+
+ set_upstream(Config, 0, <<"zzzzzZZzz">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 0), [
+ {<<"exchange">>, <<"upstream-zzz">>},
+ {<<"queue">>, <<"upstream-zzz">>}]),
+
+ set_upstream(Config, 0, <<"local3214x">>,
+ rabbit_ct_broker_helpers:node_uri(Config, 0), [
+ {<<"exchange">>, <<"upstream2">>},
+ {<<"queue">>, <<"upstream2">>}]),
+
+ set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>),
+
+ with_ch(Config,
+ fun (Ch) ->
+ expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
+ expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
+ end, [q(<<"upstream">>),
+ q(<<"upstream2">>),
+ q(<<"pattern.downstream">>)]),
+
+ clear_upstream(Config, 0, <<"local453x">>),
+ clear_upstream(Config, 0, <<"local3214x">>),
+ clear_policy(Config, 0, <<"pattern">>).
+
+multiple_downstreams(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ timer:sleep(?INITIAL_WAIT),
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
+ end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]).
+
+bidirectional(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ timer:sleep(?INITIAL_WAIT),
+ publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT),
+ publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT),
+ Seq = lists:seq(1, 100),
+ [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
+ [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
+ expect(Ch, <<"one">>, repeat(150, <<"bulk">>)),
+ expect(Ch, <<"two">>, repeat(50, <<"bulk">>)),
+ expect_empty(Ch, <<"one">>),
+ expect_empty(Ch, <<"two">>)
+ end, [q(<<"one">>),
+ q(<<"two">>)]).
+
+dynamic_reconfiguration(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ timer:sleep(?INITIAL_WAIT),
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
+
+ %% Test that clearing connections works
+ clear_upstream(Config, 0, <<"localhost">>),
+ expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
+
+ %% Test that reading them and changing them works
+ set_upstream(Config, 0,
+ <<"localhost">>, rabbit_ct_broker_helpers:node_uri(Config, 0)),
+ %% Do it twice so we at least hit the no-restart optimisation
+ URI = rabbit_ct_broker_helpers:node_uri(Config, 0, [use_ipaddr]),
+ set_upstream(Config, 0, <<"localhost">>, URI),
+ set_upstream(Config, 0, <<"localhost">>, URI),
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
+ end, upstream_downstream()).
+
+federate_unfederate(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ timer:sleep(?INITIAL_WAIT),
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
+ expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT),
+
+ %% clear the policy
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"fed">>),
+
+ expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream">>),
+ expect_no_federation(Ch, <<"upstream">>, <<"fed.downstream2">>),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"fed">>, <<"^fed\.">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"upstream">>}])
+ end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]).
+
+dynamic_plugin_stop_start(Config) ->
+ DownQ2 = <<"fed.downstream2">>,
+ with_ch(Config,
+ fun (Ch) ->
+ timer:sleep(?INITIAL_WAIT),
+ UpQ = <<"upstream">>,
+ DownQ1 = <<"fed.downstream">>,
+ expect_federation(Ch, UpQ, DownQ1, ?EXPECT_FEDERATION_TIMEOUT),
+ expect_federation(Ch, UpQ, DownQ2, ?EXPECT_FEDERATION_TIMEOUT),
+
+ %% Disable the plugin, the link disappears
+ ok = rabbit_ct_broker_helpers:disable_plugin(Config, 0, "rabbitmq_federation"),
+
+ expect_no_federation(Ch, UpQ, DownQ1),
+ expect_no_federation(Ch, UpQ, DownQ2),
+
+ declare_queue(Ch, q(DownQ1)),
+ declare_queue(Ch, q(DownQ2)),
+ ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"),
+
+ %% Declare a queue then re-enable the plugin, the links appear
+ wait_for_federation(
+ 90,
+ fun() ->
+ Status = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, status, []),
+ L = [
+ Entry || Entry <- Status,
+ proplists:get_value(queue, Entry) =:= DownQ1 orelse
+ proplists:get_value(queue, Entry) =:= DownQ2,
+ proplists:get_value(upstream_queue, Entry) =:= UpQ,
+ proplists:get_value(status, Entry) =:= running
+ ],
+ length(L) =:= 2
+ end),
+ expect_federation(Ch, UpQ, DownQ1, 120000)
+ end, upstream_downstream() ++ [q(DownQ2)]).
+
+restart_upstream(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ set_policy_upstream(Config, Rabbit, <<"^test$">>,
+ rabbit_ct_broker_helpers:node_uri(Config, Hare), []),
+
+ Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare),
+
+ declare_queue(Upstream, q(<<"test">>)),
+ declare_queue(Downstream, q(<<"test">>)),
+ Seq = lists:seq(1, 100),
+ [publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq],
+ expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)),
+ expect(Downstream, <<"test">>, repeat(25, <<"bulk">>)),
+
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, Hare),
+ ok = rabbit_ct_broker_helpers:restart_node(Config, Hare),
+ Upstream2 = rabbit_ct_client_helpers:open_channel(Config, Hare),
+
+ expect(Upstream2, <<"test">>, repeat(25, <<"bulk">>)),
+ expect(Downstream, <<"test">>, repeat(25, <<"bulk">>)),
+ expect_empty(Upstream2, <<"test">>),
+ expect_empty(Downstream, <<"test">>),
+
+ ok.
+
+%upstream_has_no_federation(Config) ->
+% %% TODO
+% ok.
+
+%%----------------------------------------------------------------------------
+repeat(Count, Item) -> [Item || _ <- lists:seq(1, Count)].
+
+%%----------------------------------------------------------------------------
+
+publish(Ch, X, Key, Payload) when is_binary(Payload) ->
+ publish(Ch, X, Key, #amqp_msg{payload = Payload});
+
+publish(Ch, X, Key, Msg = #amqp_msg{}) ->
+ amqp_channel:call(Ch, #'basic.publish'{exchange = X,
+ routing_key = Key}, Msg).
+
+publish_expect(Ch, X, Key, Q, Payload) ->
+ publish(Ch, X, Key, Payload),
+ expect(Ch, Q, [Payload]).
+
+publish_expect(Ch, X, Key, Q, Payload, Timeout) ->
+ publish(Ch, X, Key, Payload),
+ expect(Ch, Q, [Payload], Timeout).
+
+%% Doubled due to our strange basic.get behaviour.
+expect_empty(Ch, Q) ->
+ rabbit_federation_test_util:expect_empty(Ch, Q),
+ rabbit_federation_test_util:expect_empty(Ch, Q).
+
+expect_federation(Ch, UpstreamQ, DownstreamQ) ->
+ publish_expect(Ch, <<>>, UpstreamQ, DownstreamQ, <<"HELLO">>).
+
+expect_federation(Ch, UpstreamQ, DownstreamQ, Timeout) ->
+ publish_expect(Ch, <<>>, UpstreamQ, DownstreamQ, <<"HELLO">>, Timeout).
+
+expect_no_federation(Ch, UpstreamQ, DownstreamQ) ->
+ publish(Ch, <<>>, UpstreamQ, <<"HELLO">>),
+ expect_empty(Ch, DownstreamQ),
+ expect(Ch, UpstreamQ, [<<"HELLO">>]).
+
+upstream_downstream() ->
+ [q(<<"upstream">>), q(<<"fed.downstream">>)].
diff --git a/deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl b/deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl
new file mode 100644
index 0000000000..6b802a3f15
--- /dev/null
+++ b/deps/rabbitmq_federation/test/rabbit_federation_status_SUITE.erl
@@ -0,0 +1,129 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_status_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-include("rabbit_federation.hrl").
+
+-compile(export_all).
+
+-import(rabbit_federation_test_util,
+ [expect/3, expect_empty/2,
+ set_upstream/4, clear_upstream/3, set_upstream_set/4,
+ set_policy/5, clear_policy/3,
+ set_policy_upstream/5, set_policy_upstreams/4,
+ no_plugins/1, with_ch/3]).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ exchange_status,
+ queue_status,
+ lookup_exchange_status,
+ lookup_queue_status,
+ lookup_bad_status
+ ]}
+ ].
+
+suite() ->
+ [{timetrap, {minutes, 5}}].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_federation_test_util:setup_federation/1]).
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+exchange_status(Config) ->
+ exchange_SUITE:with_ch(
+ Config,
+ fun (_Ch) ->
+ [Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, status, []),
+ true = is_binary(proplists:get_value(id, Link))
+ end, exchange_SUITE:upstream_downstream()).
+
+queue_status(Config) ->
+ with_ch(
+ Config,
+ fun (_Ch) ->
+ timer:sleep(3000),
+ [Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, status, []),
+ true = is_binary(proplists:get_value(id, Link))
+ end, queue_SUITE:upstream_downstream()).
+
+lookup_exchange_status(Config) ->
+ exchange_SUITE:with_ch(
+ Config,
+ fun (_Ch) ->
+ [Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, status, []),
+ Id = proplists:get_value(id, Link),
+ Props = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, lookup, [Id]),
+ lists:all(fun(K) -> lists:keymember(K, 1, Props) end,
+ [key, uri, status, timestamp, id, supervisor, upstream])
+ end, exchange_SUITE:upstream_downstream()).
+
+lookup_queue_status(Config) ->
+ with_ch(
+ Config,
+ fun (_Ch) ->
+ timer:sleep(3000),
+ [Link] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, status, []),
+ Id = proplists:get_value(id, Link),
+ Props = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, lookup, [Id]),
+ lists:all(fun(K) -> lists:keymember(K, 1, Props) end,
+ [key, uri, status, timestamp, id, supervisor, upstream])
+ end, queue_SUITE:upstream_downstream()).
+
+lookup_bad_status(Config) ->
+ with_ch(
+ Config,
+ fun (_Ch) ->
+ timer:sleep(3000),
+ not_found = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_federation_status, lookup, [<<"justmadeitup">>])
+ end, queue_SUITE:upstream_downstream()).
diff --git a/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
new file mode 100644
index 0000000000..534817a2a4
--- /dev/null
+++ b/deps/rabbitmq_federation/test/rabbit_federation_test_util.erl
@@ -0,0 +1,354 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_federation_test_util).
+
+-include("rabbit_federation.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-import(rabbit_misc, [pget/2]).
+
+setup_federation(Config) ->
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream">>, <<"localhost">>, [
+ {<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)},
+ {<<"consumer-tag">>, <<"fed.tag">>}]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream">>, <<"local5673">>, [
+ {<<"uri">>, <<"amqp://localhost:1">>}]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"upstream">>, [
+ [
+ {<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"upstream">>},
+ {<<"queue">>, <<"upstream">>}
+ ]
+ ]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"upstream2">>, [
+ [
+ {<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"upstream2">>},
+ {<<"queue">>, <<"upstream2">>}
+ ]
+ ]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"localhost">>, [
+ [{<<"upstream">>, <<"localhost">>}]
+ ]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"upstream12">>, [
+ [
+ {<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"upstream">>},
+ {<<"queue">>, <<"upstream">>}
+ ], [
+ {<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"upstream2">>},
+ {<<"queue">>, <<"upstream2">>}
+ ]
+ ]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"one">>, [
+ [
+ {<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"one">>},
+ {<<"queue">>, <<"one">>}
+ ]
+ ]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"two">>, [
+ [
+ {<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"two">>},
+ {<<"queue">>, <<"two">>}
+ ]
+ ]),
+
+ rabbit_ct_broker_helpers:set_parameter(Config, 0,
+ <<"federation-upstream-set">>, <<"upstream5673">>, [
+ [
+ {<<"upstream">>, <<"local5673">>},
+ {<<"exchange">>, <<"upstream">>}
+ ]
+ ]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"fed">>, <<"^fed\.">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"upstream">>}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"fed12">>, <<"^fed12\.">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"upstream12">>}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"one">>, <<"^two$">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"one">>}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"two">>, <<"^one$">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"two">>}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"hare">>, <<"^hare\.">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"upstream5673">>}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"all">>, <<"^all\.">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"all">>}]),
+
+ rabbit_ct_broker_helpers:set_policy(Config, 0,
+ <<"new">>, <<"^new\.">>, <<"all">>, [
+ {<<"federation-upstream-set">>, <<"new-set">>}]),
+ Config.
+
+setup_down_federation(Config) ->
+ rabbit_ct_broker_helpers:set_parameter(
+ Config, 0, <<"federation-upstream">>, <<"broken-bunny">>,
+ [{<<"uri">>, <<"amqp://broken-bunny">>},
+ {<<"reconnect-delay">>, 600000}]),
+ rabbit_ct_broker_helpers:set_parameter(
+ Config, 0, <<"federation-upstream">>, <<"localhost">>,
+ [{<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)}]),
+ rabbit_ct_broker_helpers:set_parameter(
+ Config, 0,
+ <<"federation-upstream-set">>, <<"upstream">>,
+ [[{<<"upstream">>, <<"localhost">>},
+ {<<"exchange">>, <<"upstream">>},
+ {<<"queue">>, <<"upstream">>}],
+ [{<<"upstream">>, <<"broken-bunny">>},
+ {<<"exchange">>, <<"upstream">>},
+ {<<"queue">>, <<"upstream">>}]]),
+ rabbit_ct_broker_helpers:set_policy(
+ Config, 0,
+ <<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
+ rabbit_ct_broker_helpers:set_policy(
+ Config, 0,
+ <<"fed">>, <<"^fed\.">>, <<"all">>, [{<<"federation-upstream-set">>, <<"upstream">>}]),
+ Config.
+
+wait_for_federation(Retries, Fun) ->
+ case Fun() of
+ true ->
+ ok;
+ false when Retries > 0 ->
+ timer:sleep(1000),
+ wait_for_federation(Retries - 1, Fun);
+ false ->
+ throw({timeout_while_waiting_for_federation, Fun})
+ end.
+
+expect(Ch, Q, Fun) when is_function(Fun) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
+ no_ack = true}, self()),
+ CTag = receive
+ #'basic.consume_ok'{consumer_tag = CT} -> CT
+ end,
+ Fun(),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag});
+
+expect(Ch, Q, Payloads) ->
+ expect(Ch, Q, fun() -> expect(Payloads) end).
+
+expect(Ch, Q, Payloads, Timeout) ->
+ expect(Ch, Q, fun() -> expect(Payloads, Timeout) end).
+
+expect([]) ->
+ ok;
+expect(Payloads) ->
+ expect(Payloads, 60000).
+
+expect([], _Timeout) ->
+ ok;
+expect(Payloads, Timeout) ->
+ receive
+ {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
+ case lists:member(Payload, Payloads) of
+ true ->
+ ct:pal("Consumed a message: ~p", [Payload]),
+ expect(Payloads -- [Payload], Timeout);
+ false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload]))
+ end
+ after Timeout ->
+ ?assert(false, rabbit_misc:format("Did not receive expected payloads ~p in time", [Payloads]))
+ end.
+
+expect_empty(Ch, Q) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{ queue = Q })).
+
+set_upstream(Config, Node, Name, URI) ->
+ set_upstream(Config, Node, Name, URI, []).
+
+set_upstream(Config, Node, Name, URI, Extra) ->
+ rabbit_ct_broker_helpers:set_parameter(Config, Node,
+ <<"federation-upstream">>, Name, [{<<"uri">>, URI} | Extra]).
+
+set_upstream_in_vhost(Config, Node, VirtualHost, Name, URI) ->
+ set_upstream_in_vhost(Config, Node, VirtualHost, Name, URI, []).
+
+set_upstream_in_vhost(Config, Node, VirtualHost, Name, URI, Extra) ->
+ rabbit_ct_broker_helpers:set_parameter(Config, Node, VirtualHost,
+ <<"federation-upstream">>, Name, [{<<"uri">>, URI} | Extra]).
+
+clear_upstream(Config, Node, Name) ->
+ rabbit_ct_broker_helpers:clear_parameter(Config, Node,
+ <<"federation-upstream">>, Name).
+
+set_upstream_set(Config, Node, Name, Set) ->
+ rabbit_ct_broker_helpers:set_parameter(Config, Node,
+ <<"federation-upstream-set">>, Name,
+ [[{<<"upstream">>, UStream} | Extra] || {UStream, Extra} <- Set]).
+
+clear_upstream_set(Config, Node, Name) ->
+ rabbit_ct_broker_helpers:clear_parameter(Config, Node,
+ <<"federation-upstream-set">>, Name).
+
+set_policy(Config, Node, Name, Pattern, UpstreamSet) ->
+ rabbit_ct_broker_helpers:set_policy(Config, Node,
+ Name, Pattern, <<"all">>,
+ [{<<"federation-upstream-set">>, UpstreamSet}]).
+
+set_policy_pattern(Config, Node, Name, Pattern, Regex) ->
+ rabbit_ct_broker_helpers:set_policy(Config, Node,
+ Name, Pattern, <<"all">>,
+ [{<<"federation-upstream-pattern">>, Regex}]).
+
+clear_policy(Config, Node, Name) ->
+ rabbit_ct_broker_helpers:clear_policy(Config, Node, Name).
+
+set_policy_upstream(Config, Node, Pattern, URI, Extra) ->
+ set_policy_upstreams(Config, Node, Pattern, [{URI, Extra}]).
+
+set_policy_upstreams(Config, Node, Pattern, URIExtras) ->
+ put(upstream_num, 1),
+ [set_upstream(Config, Node, gen_upstream_name(), URI, Extra)
+ || {URI, Extra} <- URIExtras],
+ set_policy(Config, Node, Pattern, Pattern, <<"all">>).
+
+gen_upstream_name() ->
+ list_to_binary("upstream-" ++ integer_to_list(next_upstream_num())).
+
+next_upstream_num() ->
+ R = get(upstream_num) + 1,
+ put(upstream_num, R),
+ R.
+
+%% Make sure that even though multiple nodes are in a single
+%% distributed system, we still keep all our process groups separate.
+disambiguate(Config) ->
+ rabbit_ct_broker_helpers:rpc_all(Config,
+ application, set_env,
+ [rabbitmq_federation, pgroup_name_cluster_id, true]),
+ Config.
+
+no_plugins(Cfg) ->
+ [{K, case K of
+ plugins -> none;
+ _ -> V
+ end} || {K, V} <- Cfg].
+
+%%----------------------------------------------------------------------------
+
+all_federation_links(Config, Node) ->
+ rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_federation_status, status, []).
+
+federation_links_in_vhost(Config, Node, VirtualHost) ->
+ Links = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_federation_status, status, []),
+ lists:filter(
+ fun(Link) ->
+ VirtualHost =:= proplists:get_value(vhost, Link)
+ end, Links).
+
+status_fields(Prop, Statuses) ->
+ lists:usort(
+ lists:map(
+ fun(Link) -> proplists:get_value(Prop, Link) end,
+ Statuses)).
+
+assert_status(Config, Node, XorQs, Names) ->
+ rabbit_ct_broker_helpers:rpc(Config, Node,
+ ?MODULE, assert_status1, [XorQs, Names]).
+
+assert_status1(XorQs, Names) ->
+ [begin
+ ct:pal("links(XorQ) for ~p: ~p", [XorQ, links(XorQ)])
+ end || XorQ <- XorQs],
+ Links = lists:append([links(XorQ) || XorQ <- XorQs]),
+ Remaining = lists:foldl(fun (Link, Status) ->
+ assert_link_status(Link, Status, Names)
+ end, rabbit_federation_status:status(), Links),
+ ?assertEqual([], Remaining),
+ ok.
+
+assert_link_status({DXorQNameBin, UpstreamName, UXorQNameBin}, Status,
+ {TypeName, UpstreamTypeName}) ->
+ {This, Rest} = lists:partition(
+ fun(St) ->
+ pget(upstream, St) =:= UpstreamName andalso
+ pget(TypeName, St) =:= DXorQNameBin andalso
+ pget(UpstreamTypeName, St) =:= UXorQNameBin
+ end, Status),
+ ?assertMatch([_], This),
+ Rest.
+
+links(#'exchange.declare'{exchange = Name}) ->
+ case rabbit_exchange:lookup(xr(Name)) of
+ {ok, X} ->
+ case rabbit_policy:get(<<"federation-upstream-set">>, X) of
+ undefined ->
+ case rabbit_policy:get(<<"federation-upstream-pattern">>, X) of
+ undefined -> [];
+ Regex ->
+ [{Name, U#upstream.name, U#upstream.exchange_name} ||
+ U <- rabbit_federation_upstream:from_pattern(Regex, X)]
+ end;
+ Set ->
+ [{Name, U#upstream.name, U#upstream.exchange_name} ||
+ U <- rabbit_federation_upstream:from_set(Set, X)]
+ end;
+ {error, not_found} ->
+ []
+ end.
+
+xr(Name) -> rabbit_misc:r(<<"/">>, exchange, Name).
+
+with_ch(Config, Fun, Qs) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ declare_all(Ch, Qs),
+ %% Clean up queues even after test failure.
+ try
+ Fun(Ch)
+ after
+ delete_all(Ch, Qs),
+ rabbit_ct_client_helpers:close_channel(Ch)
+ end,
+ ok.
+
+declare_all(Ch, Qs) -> [declare_queue(Ch, Q) || Q <- Qs].
+delete_all(Ch, Qs) ->
+ [delete_queue(Ch, Q) || #'queue.declare'{queue = Q} <- Qs].
+
+declare_queue(Ch, Q) ->
+ amqp_channel:call(Ch, Q).
+
+delete_queue(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
+q(Name) ->
+ #'queue.declare'{queue = Name,
+ durable = true}.
diff --git a/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl b/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl
new file mode 100644
index 0000000000..511bae6540
--- /dev/null
+++ b/deps/rabbitmq_federation/test/restart_federation_link_command_SUITE.erl
@@ -0,0 +1,101 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(restart_federation_link_command_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(CMD, 'Elixir.RabbitMQ.CLI.Ctl.Commands.RestartFederationLinkCommand').
+
+all() ->
+ [
+ {group, federated_down}
+ ].
+
+groups() ->
+ [
+ {federated_down, [], [
+ run,
+ run_not_found,
+ output
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ Config2 = rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ Config2.
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(federated_down, Config) ->
+ rabbit_federation_test_util:setup_down_federation(Config),
+ Config;
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+run_not_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ {stream, []} = ?CMD:run([], Opts#{'only-down' => false}).
+
+output_not_federated(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ {stream, []} = ?CMD:output({stream, []}, Opts).
+
+run(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ rabbit_federation_test_util:with_ch(
+ Config,
+ fun(_) ->
+ timer:sleep(3000),
+ [Link | _] = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_status, status, []),
+ Id = proplists:get_value(id, Link),
+ ok = ?CMD:run([Id], Opts)
+ end,
+ [rabbit_federation_test_util:q(<<"upstream">>),
+ rabbit_federation_test_util:q(<<"fed.downstream">>)]).
+
+run_not_found(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ {error, _ErrorMsg} = ?CMD:run([<<"MakingItUp">>], Opts).
+
+output(Config) ->
+ [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Opts = #{node => A},
+ ok = ?CMD:output(ok, Opts).
diff --git a/deps/rabbitmq_federation/test/unit_SUITE.erl b/deps/rabbitmq_federation/test/unit_SUITE.erl
new file mode 100644
index 0000000000..7d0707f96c
--- /dev/null
+++ b/deps/rabbitmq_federation/test/unit_SUITE.erl
@@ -0,0 +1,65 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2019-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_SUITE).
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_federation.hrl").
+
+-compile(export_all).
+
+all() -> [
+ obfuscate_upstream,
+ obfuscate_upstream_params_network,
+ obfuscate_upstream_params_network_with_char_list_password_value,
+ obfuscate_upstream_params_direct
+].
+
+init_per_suite(Config) ->
+ application:ensure_all_started(credentials_obfuscation),
+ Config.
+
+end_per_suite(Config) ->
+ Config.
+
+obfuscate_upstream(_Config) ->
+ Upstream = #upstream{uris = [<<"amqp://guest:password@localhost">>]},
+ ObfuscatedUpstream = rabbit_federation_util:obfuscate_upstream(Upstream),
+ ?assertEqual(Upstream, rabbit_federation_util:deobfuscate_upstream(ObfuscatedUpstream)),
+ ok.
+
+obfuscate_upstream_params_network(_Config) ->
+ UpstreamParams = #upstream_params{
+ uri = <<"amqp://guest:password@localhost">>,
+ params = #amqp_params_network{password = <<"password">>}
+ },
+ ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(UpstreamParams),
+ ?assertEqual(UpstreamParams, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)),
+ ok.
+
+obfuscate_upstream_params_network_with_char_list_password_value(_Config) ->
+ Input = #upstream_params{
+ uri = <<"amqp://guest:password@localhost">>,
+ params = #amqp_params_network{password = "password"}
+ },
+ Output = #upstream_params{
+ uri = <<"amqp://guest:password@localhost">>,
+ params = #amqp_params_network{password = <<"password">>}
+ },
+ ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(Input),
+ ?assertEqual(Output, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)),
+ ok.
+
+ obfuscate_upstream_params_direct(_Config) ->
+ UpstreamParams = #upstream_params{
+ uri = <<"amqp://guest:password@localhost">>,
+ params = #amqp_params_direct{password = <<"password">>}
+ },
+ ObfuscatedUpstreamParams = rabbit_federation_util:obfuscate_upstream_params(UpstreamParams),
+ ?assertEqual(UpstreamParams, rabbit_federation_util:deobfuscate_upstream_params(ObfuscatedUpstreamParams)),
+ ok.
diff --git a/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl b/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl
new file mode 100644
index 0000000000..f65dffbe8e
--- /dev/null
+++ b/deps/rabbitmq_federation/test/unit_inbroker_SUITE.erl
@@ -0,0 +1,230 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(unit_inbroker_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("rabbit_federation.hrl").
+
+-compile(export_all).
+
+-define(US_NAME, <<"upstream">>).
+-define(DS_NAME, <<"fed.downstream">>).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ serialisation,
+ scratch_space,
+ remove_credentials,
+ get_connection_name,
+ upstream_validation,
+ upstream_set_validation
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, ?MODULE}
+ ]),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+%% Test that we apply binding changes in the correct order even when
+%% they arrive out of order.
+serialisation(Config) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, serialisation1, []).
+
+serialisation1() ->
+ with_exchanges(
+ fun(X) ->
+ [B1, B2, B3] = [b(K) || K <- [<<"1">>, <<"2">>, <<"3">>]],
+ remove_bindings(4, X, [B1, B3]),
+ add_binding(5, X, B1),
+ add_binding(1, X, B1),
+ add_binding(2, X, B2),
+ add_binding(3, X, B3),
+ %% List of lists because one for each link
+ Keys = rabbit_federation_exchange_link:list_routing_keys(
+ X#exchange.name),
+ [[<<"1">>, <<"2">>]] =:= Keys
+ end).
+
+scratch_space(Config) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, scratch_space1, []).
+
+scratch_space1() ->
+ A = <<"A">>,
+ B = <<"B">>,
+ DB = rabbit_federation_db,
+ with_exchanges(
+ fun(#exchange{name = N}) ->
+ DB:set_active_suffix(N, upstream(x), A),
+ DB:set_active_suffix(N, upstream(y), A),
+ DB:prune_scratch(N, [upstream(y), upstream(z)]),
+ DB:set_active_suffix(N, upstream(y), B),
+ DB:set_active_suffix(N, upstream(z), A),
+ none = DB:get_active_suffix(N, upstream(x), none),
+ B = DB:get_active_suffix(N, upstream(y), none),
+ A = DB:get_active_suffix(N, upstream(z), none)
+ end).
+
+remove_credentials(Config) ->
+ Test0 = fun (In, Exp) ->
+ Act = rabbit_ct_broker_helpers:rpc(Config, 0,
+ rabbit_federation_upstream, remove_credentials, [In]),
+ Exp = Act
+ end,
+ Cat = fun (Bs) ->
+ list_to_binary(lists:append([binary_to_list(B) || B <- Bs]))
+ end,
+ Test = fun (Scheme, Rest) ->
+ Exp = Cat([Scheme, Rest]),
+ Test0(Exp, Exp),
+ Test0(Cat([Scheme, <<"user@">>, Rest]), Exp),
+ Test0(Cat([Scheme, <<"user:pass@">>, Rest]), Exp)
+ end,
+ Test(<<"amqp://">>, <<"">>),
+ Test(<<"amqp://">>, <<"localhost">>),
+ Test(<<"amqp://">>, <<"localhost/">>),
+ Test(<<"amqp://">>, <<"localhost/foo">>),
+ Test(<<"amqp://">>, <<"localhost:5672">>),
+ Test(<<"amqp://">>, <<"localhost:5672/foo">>),
+ Test(<<"amqps://">>, <<"localhost:5672/%2f">>),
+ ok.
+
+get_connection_name(Config) ->
+ Amqqueue = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ amqqueue, new, [rabbit_misc:r(<<"/">>, queue, <<"queue">>),
+ self(),
+ false,
+ false,
+ none,
+ [],
+ undefined,
+ #{},
+ classic]),
+ AmqqueueWithPolicy = amqqueue:set_policy(Amqqueue, [{name, <<"my.federation.policy">>}]),
+ AmqqueueWithEmptyPolicy = amqqueue:set_policy(Amqqueue, []),
+
+
+ <<"Federation link (upstream: my.upstream, policy: my.federation.policy)">> = rabbit_federation_link_util:get_connection_name(
+ #upstream{name = <<"my.upstream">>},
+ #upstream_params{x_or_q = AmqqueueWithPolicy}
+ ),
+ <<"Federation link (upstream: my.upstream, policy: my.federation.policy)">> = rabbit_federation_link_util:get_connection_name(
+ #upstream{name = <<"my.upstream">>},
+ #upstream_params{x_or_q = #exchange{policy = [{name, <<"my.federation.policy">>}]}}
+ ),
+ <<"Federation link">> = rabbit_federation_link_util:get_connection_name(
+ #upstream{},
+ #upstream_params{x_or_q = AmqqueueWithEmptyPolicy}
+ ),
+ <<"Federation link">> = rabbit_federation_link_util:get_connection_name(
+ #upstream{},
+ #upstream_params{x_or_q = #exchange{policy = []}}
+ ),
+ <<"Federation link">> = rabbit_federation_link_util:get_connection_name(
+ whatever,
+ whatever
+ ),
+ ok.
+
+upstream_set_validation(_Config) ->
+ ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream-set">>,
+ <<"a-name">>,
+ [[{<<"upstream">>, <<"devtest1">>}],
+ [{<<"upstream">>, <<"devtest2">>}]],
+ <<"acting-user">>),
+ [[ok], [ok]]),
+ ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream-set">>,
+ <<"a-name">>,
+ [#{<<"upstream">> => <<"devtest3">>},
+ #{<<"upstream">> => <<"devtest4">>}],
+ <<"acting-user">>),
+ [[ok], [ok]]),
+ ok.
+
+upstream_validation(_Config) ->
+ ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream">>,
+ <<"a-name">>,
+ [{<<"uri">>, <<"amqp://127.0.0.1/%2f">>}],
+ <<"acting-user">>),
+ [ok]),
+ ?assertEqual(rabbit_federation_parameters:validate(<<"/">>, <<"federation-upstream">>,
+ <<"a-name">>,
+ #{<<"uri">> => <<"amqp://127.0.0.1/%2f">>},
+ <<"acting-user">>),
+ [ok]),
+ ok.
+
+with_exchanges(Fun) ->
+ rabbit_exchange:declare(r(?US_NAME), fanout, false, false, false, [],
+ <<"acting-user">>),
+ X = rabbit_exchange:declare(r(?DS_NAME), fanout, false, false, false, [],
+ <<"acting-user">>),
+ Fun(X),
+ %% Delete downstream first or it will recreate the upstream
+ rabbit_exchange:delete(r(?DS_NAME), false, <<"acting-user">>),
+ rabbit_exchange:delete(r(?US_NAME), false, <<"acting-user">>),
+ ok.
+
+add_binding(Ser, X, B) ->
+ rabbit_federation_exchange:add_binding(transaction, X, B),
+ rabbit_federation_exchange:add_binding(Ser, X, B).
+
+remove_bindings(Ser, X, Bs) ->
+ rabbit_federation_exchange:remove_bindings(transaction, X, Bs),
+ rabbit_federation_exchange:remove_bindings(Ser, X, Bs).
+
+r(Name) -> rabbit_misc:r(<<"/">>, exchange, Name).
+
+b(Key) ->
+ #binding{source = ?DS_NAME, destination = <<"whatever">>,
+ key = Key, args = []}.
+
+upstream(UpstreamName) ->
+ #upstream{name = atom_to_list(UpstreamName),
+ exchange_name = <<"upstream">>}.