diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2020-05-12 17:24:29 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-12 17:24:29 +0200 |
| commit | 8120143c7c56aa17a93797923a7379daf2e26e97 (patch) | |
| tree | d45838b526e91ac6d43215b01d25e70f244db122 | |
| parent | 776278c05cdf88b7561f828e794ab06083ca2b79 (diff) | |
| parent | 056f665c615e2ad47c0e59ab24b4f07f992dad89 (diff) | |
| download | rabbitmq-server-git-8120143c7c56aa17a93797923a7379daf2e26e97.tar.gz | |
Merge pull request #2328 from rabbitmq/revert-rabbitmq-server-2308
Revert rabbitmq server 2308
| -rw-r--r-- | test/backing_queue_SUITE.erl | 40 | ||||
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 102 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 48 | ||||
| -rw-r--r-- | test/publisher_confirms_parallel_SUITE.erl | 45 | ||||
| -rw-r--r-- | test/queue_master_location_SUITE.erl | 11 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 54 | ||||
| -rw-r--r-- | test/unit_gm_SUITE.erl | 27 |
7 files changed, 272 insertions, 55 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 49e53c6c01..40f344324d 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -42,6 +42,7 @@ variable_queue_purge, variable_queue_requeue, variable_queue_requeue_ram_beta, + variable_queue_fold, variable_queue_batch_publish, variable_queue_batch_publish_delivered ]). @@ -175,7 +176,8 @@ orelse Group =:= backing_queue_embed_limit_1024 -> end_per_group1(_, Config) -> Config. -init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue -> +init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; + Testcase == variable_queue_fold -> ok = rabbit_ct_broker_helpers:rpc( Config, 0, application, set_env, [rabbit, queue_explicit_gc_run_operation_threshold, 0]), @@ -183,7 +185,8 @@ init_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue -> init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). -end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue -> +end_per_testcase(Testcase, Config) when Testcase == variable_queue_requeue; + Testcase == variable_queue_fold -> ok = rabbit_ct_broker_helpers:rpc( Config, 0, application, set_env, [rabbit, queue_explicit_gc_run_operation_threshold, 1000]), @@ -1147,6 +1150,39 @@ variable_queue_requeue_ram_beta2(VQ0, _Config) -> {_, VQ8} = rabbit_variable_queue:ack(AcksAll, VQ7), VQ8. +variable_queue_fold(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, variable_queue_fold1, [Config]). + +variable_queue_fold1(Config) -> + with_fresh_variable_queue( + fun variable_queue_fold2/2, + ?config(variable_queue_type, Config)). + +variable_queue_fold2(VQ0, _Config) -> + {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Count = rabbit_variable_queue:depth(VQ1), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> + {Acc, VQ1} = rabbit_variable_queue:fold( + fun (M, _, Pending, A) -> + MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert + case MInt =< Cut of + true -> {cont, [MInt | A]}; + false -> {stop, A} + end + end, [], VQ0), + Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs), + Expected = lists:reverse(Acc), %% assertion + VQ1. + variable_queue_batch_publish(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, variable_queue_batch_publish1, [Config]). diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 56d95df878..aaaeb4a939 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -18,7 +18,9 @@ groups() -> [ {parallel_tests, [parallel], [ {overflow_reject_publish_dlx, [parallel], OverflowTests}, - {overflow_reject_publish, [parallel], OverflowTests} + {overflow_reject_publish, [parallel], OverflowTests}, + dead_queue_rejects, + mixed_dead_alive_queues_reject ]} ]. @@ -62,7 +64,9 @@ init_per_testcase(policy_resets_to_default = Testcase, Config) -> rabbit_ct_helpers:testcase_started( rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase); init_per_testcase(Testcase, Config) - when Testcase == confirms_rejects_conflict -> + when Testcase == confirms_rejects_conflict; + Testcase == dead_queue_rejects; + Testcase == mixed_dead_alive_queues_reject -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -87,6 +91,16 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> XOverflow = ?config(overflow, Config), QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>, amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}), + end_per_testcase0(Testcase, Config); +end_per_testcase(dead_queue_rejects = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}), + end_per_testcase0(Testcase, Config); +end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_alive">>}), + amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"mixed_dead_alive_queues_reject">>}), end_per_testcase0(Testcase, Config). end_per_testcase0(Testcase, Config) -> @@ -102,6 +116,90 @@ end_per_testcase0(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +dead_queue_rejects(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueName = <<"dead_queue_rejects">>, + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + durable = true}), + + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> ok + after 10000 -> + error(timeout_waiting_for_initial_ack) + end, + + BasicPublish = #'basic.publish'{routing_key = QueueName}, + AmqpMsg = #amqp_msg{payload = <<"HI">>}, + kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, 5). + +mixed_dead_alive_queues_reject(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueNameDead = <<"mixed_dead_alive_queues_reject_dead">>, + QueueNameAlive = <<"mixed_dead_alive_queues_reject_alive">>, + ExchangeName = <<"mixed_dead_alive_queues_reject">>, + + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameDead, + durable = true}), + amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameAlive, + durable = true}), + + amqp_channel:call(Ch, #'exchange.declare'{exchange = ExchangeName, + durable = true}), + + amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName, + queue = QueueNameAlive, + routing_key = <<"route">>}), + + amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName, + queue = QueueNameDead, + routing_key = <<"route">>}), + + amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName, + routing_key = <<"route">>}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> ok; + {'basic.nack',_,_,_} -> error(expecting_ack_got_nack) + after 50000 -> + error({timeout_waiting_for_initial_ack, process_info(self(), messages)}) + end, + + BasicPublish = #'basic.publish'{exchange = ExchangeName, routing_key = <<"route">>}, + AmqpMsg = #amqp_msg{payload = <<"HI">>}, + kill_queue_expect_nack(Config, Ch, QueueNameDead, BasicPublish, AmqpMsg, 5). + +kill_queue_expect_nack(_Config, _Ch, _QueueName, _BasicPublish, _AmqpMsg, 0) -> + error(expecting_nack_got_ack); +kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, Tries) -> + kill_the_queue(QueueName, Config), + amqp_channel:cast(Ch, BasicPublish, AmqpMsg), + R = receive + {'basic.nack',_,_,_} -> + ok; + {'basic.ack',_,_} -> + retry + after 10000 -> + error({timeout_waiting_for_nack, process_info(self(), messages)}) + end, + case R of + ok -> + ok; + retry -> + kill_queue_expect_nack(Config, Ch, QueueName, BasicPublish, AmqpMsg, Tries - 1) + end. + confirms_rejects_conflict(Config) -> Conn = ?config(conn, Config), Conn1 = ?config(conn1, Config), diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 698646d9c3..d5e9700b66 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -225,8 +225,7 @@ change_policy(Config) -> ok. change_cluster(Config) -> - [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(Config, - nodename), + [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), rabbit_ct_broker_helpers:cluster_nodes(Config, [A, B, C]), ACh = rabbit_ct_client_helpers:open_channel(Config, A), @@ -234,17 +233,16 @@ change_cluster(Config) -> assert_slaves(A, ?QNAME, {A, ''}), %% Give it policy exactly 4, it should mirror to all 3 nodes - rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY, - {<<"exactly">>, 4}), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY, {<<"exactly">>, 4}), assert_slaves(A, ?QNAME, {A, [B, C]}, [{A, []}, {A, [B]}, {A, [C]}]), - %% Add D and E, D joins in + %% Add D and E, D or E joins in rabbit_ct_broker_helpers:cluster_nodes(Config, [A, D, E]), - assert_slaves(A, ?QNAME, {A, [B, C, D]}, [{A, [B, C]}]), + assert_slaves(A, ?QNAME, [{A, [B, C, D]}, {A, [B, C, E]}], [{A, [B, C]}]), - %% Remove D, E joins in + %% Remove one, the other joins in rabbit_ct_broker_helpers:stop_node(Config, D), - assert_slaves(A, ?QNAME, {A, [B, C, E]}, [{A, [B, C]}]), + assert_slaves(A, ?QNAME, [{A, [B, C, D]}, {A, [B, C, E]}], [{A, [B, C]}]), ok. @@ -805,9 +803,23 @@ assert_slaves(RPCNode, QName, Exp, PermittedIntermediate) -> [{get(previous_exp_m_node), get(previous_exp_s_nodes)} | PermittedIntermediate], 1000). -assert_slaves0(_, _, _, _, 0) -> - error(give_up_waiting_for_slaves); +assert_slaves0(_RPCNode, _QName, [], _PermittedIntermediate, _Attempts) -> + error(invalid_expectation); +assert_slaves0(RPCNode, QName, [{ExpMNode, ExpSNodes}|T], PermittedIntermediate, Attempts) -> + case assert_slaves1(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts, nofail) of + ok -> + ok; + failed -> + assert_slaves0(RPCNode, QName, T, PermittedIntermediate, Attempts - 1) + end; assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts) -> + assert_slaves1(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts, fail). + +assert_slaves1(_RPCNode, _QName, _Exp, _PermittedIntermediate, 0, fail) -> + error(give_up_waiting_for_slaves); +assert_slaves1(_RPCNode, _QName, _Exp, _PermittedIntermediate, 0, nofail) -> + failed; +assert_slaves1(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Attempts, FastFail) -> Q = find_queue(QName, RPCNode), Pid = proplists:get_value(pid, Q), SPids = proplists:get_value(slave_pids, Q), @@ -825,16 +837,22 @@ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, Att case [{PermMNode, PermSNodes} || {PermMNode, PermSNodes} <- PermittedIntermediate, PermMNode =:= ActMNode, equal_list(PermSNodes, ActSNodes)] of - [] -> ct:fail("Expected ~p / ~p, got ~p / ~p~nat ~p~n", - [ExpMNode, ExpSNodes, ActMNode, ActSNodes, - get_stacktrace()]); + [] -> + case FastFail of + fail -> + ct:fail("Expected ~p / ~p, got ~p / ~p~nat ~p~n", + [ExpMNode, ExpSNodes, ActMNode, ActSNodes, + get_stacktrace()]); + nofail -> + failed + end; State -> ct:pal("Waiting to leave state ~p~n Waiting for ~p~n", [State, {ExpMNode, ExpSNodes}]), timer:sleep(200), - assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, + assert_slaves1(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate, - Attempts - 1) + Attempts - 1, FastFail) end; true -> put(previous_exp_m_node, ExpMNode), diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl index 824760e9e5..8e90452b0d 100644 --- a/test/publisher_confirms_parallel_SUITE.erl +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -35,43 +35,20 @@ all() -> ]. groups() -> + PublisherConfirmTests = [publisher_confirms, + publisher_confirms_with_deleted_queue, + confirm_select_ok, + confirm_nowait, + confirm_ack, + confirm_acks, + confirm_mandatory_unroutable, + confirm_unroutable_message], [ {publisher_confirm_tests, [], [ - {classic_queue, - [parallel], - [publisher_confirms, - publisher_confirms_with_deleted_queue, - confirm_select_ok, - confirm_nowait, - confirm_ack, - confirm_acks, - confirm_mandatory_unroutable, - confirm_unroutable_message, - confirm_nack] - }, - {mirrored_queue, - [parallel], - [publisher_confirms_with_deleted_queue, - confirm_select_ok, - confirm_nowait, - confirm_ack, - confirm_acks, - confirm_mandatory_unroutable, - confirm_unroutable_message] - }, - {quorum_queue, - [parallel], - [publisher_confirms, - publisher_confirms_with_deleted_queue, - confirm_select_ok, - confirm_nowait, - confirm_ack, - confirm_acks, - confirm_mandatory_unroutable, - confirm_unroutable_message, - confirm_minority] - } + {classic_queue, [parallel], PublisherConfirmTests ++ [confirm_nack]}, + {mirrored_queue, [parallel], PublisherConfirmTests ++ [confirm_nack]}, + {quorum_queue, [parallel], PublisherConfirmTests ++ [confirm_minority]} ]} ]. diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl index 1049a263c8..694b2e5d11 100644 --- a/test/queue_master_location_SUITE.erl +++ b/test/queue_master_location_SUITE.erl @@ -58,7 +58,8 @@ groups() -> declare_config, calculate_min_master, calculate_min_master_with_bindings, - calculate_random + calculate_random, + calculate_client_local ]} ]. @@ -248,6 +249,14 @@ calculate_random(Config) -> verify_random(Config, Q), ok. +calculate_client_local(Config) -> + setup_test_environment(Config), + QueueName = rabbit_misc:r(<<"/">>, queue, Q = <<"qm.test">>), + Args = [{<<"x-queue-master-locator">>, longstr, <<"client-local">>}], + declare(Config, QueueName, false, false, Args, none), + verify_client_local(Config, Q), + ok. + %% %% Setup environment %% diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index 5eae4b062c..567f43fe67 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -31,6 +31,10 @@ all() -> ]. groups() -> + RejectTests = [ + rejects_survive_stop, + rejects_survive_policy + ], [ {cluster_size_2, [], [ rapid_redeclare, @@ -43,7 +47,10 @@ groups() -> consume_survives_policy, auto_resume, auto_resume_no_ccn_client, - confirms_survive_stop + confirms_survive_stop, + confirms_survive_policy, + {overflow_reject_publish, [], RejectTests}, + {overflow_reject_publish_dlx, [], RejectTests} ]} ]. @@ -65,6 +72,14 @@ init_per_group(cluster_size_2, Config) -> init_per_group(cluster_size_3, Config) -> rabbit_ct_helpers:set_config(Config, [ {rmq_nodes_count, 3} + ]); +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} ]). end_per_group(_, Config) -> @@ -189,6 +204,10 @@ auto_resume_no_ccn_client(Cf) -> consume_survives(Cf, fun sigkill/2, false, false). confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2). +confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2). + +rejects_survive_stop(Cf) -> rejects_survive(Cf, fun stop/2). +rejects_survive_policy(Cf) -> rejects_survive(Cf, fun policy/2). %%---------------------------------------------------------------------------- @@ -247,6 +266,39 @@ confirms_survive(Config, DeathFun) -> rabbit_ha_test_producer:await_response(ProducerPid), ok. +rejects_survive(Config, DeathFun) -> + [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000), + Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A), + Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B), + + %% declare the queue on the master, mirrored to the two slaves + XOverflow = ?config(overflow, Config), + Queue = <<"test_rejects", "_", XOverflow/binary>>, + amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue, + auto_delete = false, + durable = true, + arguments = [{<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, XOverflow}]}), + Payload = <<"there can be only one">>, + amqp_channel:call(Node1Channel, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{payload = Payload}), + + %% send a bunch of messages from the producer. Tolerating nacks. + ProducerPid = rabbit_ha_test_producer:create(Node2Channel, Queue, + self(), true, Msgs, nacks), + DeathFun(Config, A), + rabbit_ha_test_producer:await_response(ProducerPid), + + {#'basic.get_ok'{}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}), + %% There is only one message. + #'basic.get_empty'{} = amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}), + ok. + + + stop(Config, Node) -> rabbit_ct_broker_helpers:stop_node_after(Config, Node, 50). diff --git a/test/unit_gm_SUITE.erl b/test/unit_gm_SUITE.erl index 6724a767d2..a574753e8e 100644 --- a/test/unit_gm_SUITE.erl +++ b/test/unit_gm_SUITE.erl @@ -37,6 +37,7 @@ all() -> join_leave, broadcast, confirmed_broadcast, + member_death, receive_in_order, unexpected_msg, down_in_members_change @@ -73,6 +74,32 @@ broadcast(_Config) -> confirmed_broadcast(_Config) -> passed = do_broadcast(fun gm:confirmed_broadcast/2). +member_death(_Config) -> + passed = with_two_members( + fun (Pid, Pid2) -> + {ok, Pid3} = gm:start_link( + ?MODULE, ?MODULE, self(), + fun rabbit_misc:execute_mnesia_transaction/1), + passed = receive_joined(Pid3, [Pid, Pid2, Pid3], + timeout_joining_gm_group_3), + passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1), + passed = receive_birth(Pid2, Pid3, timeout_waiting_for_birth_3_2), + + unlink(Pid3), + exit(Pid3, kill), + + %% Have to do some broadcasts to ensure that all members + %% find out about the death. + BFun = broadcast_fun(fun gm:confirmed_broadcast/2), + passed = BFun(Pid, Pid2), + passed = BFun(Pid, Pid2), + + passed = receive_death(Pid, Pid3, timeout_waiting_for_death_3_1), + passed = receive_death(Pid2, Pid3, timeout_waiting_for_death_3_2), + + passed + end). + receive_in_order(_Config) -> passed = with_two_members( fun (Pid, Pid2) -> |
