diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-06-28 02:20:56 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-06-28 02:20:56 +0300 |
| commit | dd25bf9445eccdf148dfbd9fdfe59180b1f6b1d4 (patch) | |
| tree | 4aa40738d4d72ddf7b89ccf5e623ff0a74ab926e | |
| parent | ebd33bab54474ea13653a2861314e93151aad4bc (diff) | |
| parent | b6aaf53513ae5b8d909a714f53389fc5bd3445c8 (diff) | |
| download | rabbitmq-server-git-dd25bf9445eccdf148dfbd9fdfe59180b1f6b1d4.tar.gz | |
Merge pull request #857 from rabbitmq/rabbitmq-server-802rabbitmq_v3_6_3_rc1
Infinity loop in synchronisation of priority queues
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 7 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 227 | ||||
| -rw-r--r-- | test/priority_queue_recovery_SUITE.erl | 153 |
4 files changed, 310 insertions, 79 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9edb99c4d7..c04c82f45e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -120,7 +120,7 @@ handle_go(Q = #amqqueue{name = QName}) -> Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), Q1 = Q #amqqueue { pid = QPid }, - ok = rabbit_queue_index:erase(QName), %% For crash recovery + _ = BQ:delete_crashed(Q), %% For crash recovery BQS = bq_init(BQ, Q1, new), State = #state { q = Q1, gm = GM, diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index ae8a38daf0..b7a3afd129 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -43,7 +43,7 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4]). --record(state, {bq, bqss}). +-record(state, {bq, bqss, max_priority}). -record(passthrough, {bq, bqs}). %% See 'note on suffixes' below @@ -157,7 +157,8 @@ init(Q, Recover, AsyncCallback) -> [{P, Init(P, Term)} || {P, Term} <- PsTerms] end, #state{bq = BQ, - bqss = BQSs} + bqss = BQSs, + max_priority = hd(Ps)} end. %% [0] collapse_recovery has the effect of making a list of recovery %% terms in priority order, even for non priority queues. It's easier @@ -419,6 +420,8 @@ info(Item, #passthrough{bq = BQ, bqs = BQS}) -> invoke(Mod, {P, Fun}, State = #state{bq = BQ}) -> pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State); +invoke(Mod, Fun, State = #state{bq = BQ, max_priority = P}) -> + pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State); invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(invoke(Mod, Fun, BQS)). diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 56b44d423e..46fafd89f7 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -29,32 +29,28 @@ all() -> groups() -> [ - {cluster_size_2, [], [ - {parallel_tests, [parallel], [ - ackfold, - drop, - dropwhile_fetchwhile, - info_head_message_timestamp, - matching, - mirror_queue_sync, - mirror_queue_sync_priority_above_max, - mirror_queue_sync_priority_above_max_pending_ack, - mirror_queue_sync_order, - purge, - requeue, - resume, - simple_order, - straight_through - ]}, - {non_parallel_tests, [], [ - recovery %% Restart RabbitMQ. - ]} - ]}, - {cluster_size_3, [], [ - {parallel_tests, [parallel], [ - mirror_queue_auto_ack - ]} - ]} + {cluster_size_2, [], [ + ackfold, + drop, + dropwhile_fetchwhile, + info_head_message_timestamp, + matching, + mirror_queue_sync, + mirror_queue_sync_priority_above_max, + mirror_queue_sync_priority_above_max_pending_ack, + mirror_queue_sync_order, + purge, + requeue, + resume, + simple_order, + straight_through, + invoke + ]}, + {cluster_size_3, [], [ + mirror_queue_auto_ack, + mirror_fast_reset_policy, + mirror_reset_policy + ]} ]. %% ------------------------------------------------------------------- @@ -69,35 +65,35 @@ end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). init_per_group(cluster_size_2, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodes_count, 2} + {rmq_nodes_count, 2}, + {rmq_nodename_suffix, Suffix} ]), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()); init_per_group(cluster_size_3, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodes_count, 3} + {rmq_nodes_count, 3}, + {rmq_nodename_suffix, Suffix} ]), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); -init_per_group(_, Config) -> - Config. + rabbit_ct_client_helpers:setup_steps()). -end_per_group(ClusterSizeGroup, Config) -when ClusterSizeGroup =:= cluster_size_2 -orelse ClusterSizeGroup =:= cluster_size_3 -> +end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()); -end_per_group(_, Config) -> - Config. + rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:setup_steps(), rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:teardown_steps(), rabbit_ct_helpers:testcase_finished(Config, Testcase). %% ------------------------------------------------------------------- @@ -135,25 +131,8 @@ end_per_testcase(Testcase, Config) -> %% %% [0] publish enough to get credit flow from msg store -recovery(Config) -> - {Conn, Ch} = open(Config), - Q = <<"recovery-queue">>, - declare(Ch, Q, 3), - publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), - amqp_connection:close(Conn), - - %% TODO This terminates the automatically open connection and breaks - %% coverage. - rabbit_ct_broker_helpers:restart_broker(Config, 0), - - {Conn2, Ch2} = open(Config), - get_all(Ch2, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]), - delete(Ch2, Q), - amqp_connection:close(Conn2), - passed. - simple_order(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"simple_order-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), @@ -164,10 +143,11 @@ simple_order(Config) -> get_all(Ch, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]), delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. matching(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"matching-queue">>, declare(Ch, Q, 5), %% We round priority down, and 0 is the default @@ -175,10 +155,11 @@ matching(Config) -> get_all(Ch, Q, do_ack, [5, 10, undefined, 0, undefined]), delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. resume(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"resume-queue">>, declare(Ch, Q, 5), amqp_channel:call(Ch, #'confirm.select'{}), @@ -187,10 +168,11 @@ resume(Config) -> amqp_channel:call(Ch, #'queue.purge'{queue = Q}), %% Assert it exists delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. straight_through(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"straight_through-queue">>, declare(Ch, Q, 3), [begin @@ -204,10 +186,35 @@ straight_through(Config) -> get_empty(Ch, Q), delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + +invoke(Config) -> + %% Synthetic test to check the invoke callback, as the bug tested here + %% is only triggered with a race condition. + %% When mirroring is stopped, the backing queue of rabbit_amqqueue_process + %% changes from rabbit_mirror_queue_master to rabbit_priority_queue, + %% which shouldn't receive any invoke call. However, there might + %% be pending messages so the priority queue receives the + %% `run_backing_queue` cast message sent to the old master. + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), + Q = <<"invoke-queue">>, + declare(Ch, Q, 3), + Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + rabbit_ct_broker_helpers:rpc( + Config, A, gen_server, cast, + [Pid, + {run_backing_queue, ?MODULE, fun(_, _) -> ok end}]), + Pid2 = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + Pid = Pid2, + delete(Ch, Q), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. dropwhile_fetchwhile(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"dropwhile_fetchwhile-queue">>, [begin declare(Ch, Q, Args ++ arguments(3)), @@ -221,10 +228,11 @@ dropwhile_fetchwhile(Config) -> {<<"x-dead-letter-exchange">>, longstr, <<"amq.fanout">>}] ]], rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. ackfold(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"ackfolq-queue1">>, Q2 = <<"ackfold-queue2">>, declare(Ch, Q, @@ -242,10 +250,11 @@ ackfold(Config) -> delete(Ch, Q), delete(Ch, Q2), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. requeue(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"requeue-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [1, 2, 3]), @@ -256,10 +265,11 @@ requeue(Config) -> get_all(Ch, Q, do_ack, [3, 2, 1]), delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. drop(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"drop-queue">>, declare(Ch, Q, [{<<"x-max-length">>, long, 4} | arguments(3)]), publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), @@ -268,10 +278,11 @@ drop(Config) -> get_all(Ch, Q, do_ack, [2, 1, 1, 1]), delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. purge(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"purge-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [1, 2, 3]), @@ -279,6 +290,7 @@ purge(Config) -> get_empty(Ch, Q), delete(Ch, Q), rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. info_head_message_timestamp(Config) -> @@ -349,7 +361,7 @@ ram_duration(_Config) -> passed. mirror_queue_sync(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q = <<"mirror_queue_sync-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [1, 2, 3]), @@ -363,13 +375,14 @@ mirror_queue_sync(Config) -> rabbit_ct_broker_helpers:control_action(sync_queue, Nodename0, [binary_to_list(Q)], [{"-p", "/"}]), wait_for_sync(Config, Nodename0, rabbit_misc:r(<<"/">>, queue, Q)), + rabbit_ct_client_helpers:close_connection(Conn), passed. mirror_queue_sync_priority_above_max(Config) -> A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), %% Tests synchronisation of slaves when priority is higher than max priority. %% This causes an infinity loop (and test timeout) before rabbitmq-server-795 - Ch = rabbit_ct_client_helpers:open_channel(Config, A), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), Q = <<"mirror_queue_sync_priority_above_max-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [5, 5, 5]), @@ -379,6 +392,7 @@ mirror_queue_sync_priority_above_max(Config) -> [binary_to_list(Q)], [{"-p", "/"}]), wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), delete(Ch, Q), + rabbit_ct_client_helpers:close_connection(Conn), passed. mirror_queue_sync_priority_above_max_pending_ack(Config) -> @@ -386,7 +400,7 @@ mirror_queue_sync_priority_above_max_pending_ack(Config) -> %% Tests synchronisation of slaves when priority is higher than max priority %% and there are pending acks. %% This causes an infinity loop (and test timeout) before rabbitmq-server-795 - Ch = rabbit_ct_client_helpers:open_channel(Config, A), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), Q = <<"mirror_queue_sync_priority_above_max_pending_ack-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [5, 5, 5]), @@ -401,6 +415,7 @@ mirror_queue_sync_priority_above_max_pending_ack(Config) -> synced_msgs(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 3), synced_msgs(Config, B, rabbit_misc:r(<<"/">>, queue, Q), 3), delete(Ch, Q), + rabbit_ct_client_helpers:close_connection(Conn), passed. mirror_queue_auto_ack(Config) -> @@ -410,7 +425,7 @@ mirror_queue_auto_ack(Config) -> %% the slaves will crash with the depth notification as they will not %% match the master delta. %% Bug rabbitmq-server 687 - Ch = rabbit_ct_client_helpers:open_channel(Config, A), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), Q = <<"mirror_queue_auto_ack-queue">>, declare(Ch, Q, 3), publish(Ch, Q, [1, 2, 3]), @@ -431,13 +446,15 @@ mirror_queue_auto_ack(Config) -> SPid2 = proplists:get_value(SNode2, Slaves), delete(Ch, Q), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), passed. mirror_queue_sync_order(Config) -> A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, A), - Ch2 = rabbit_ct_client_helpers:open_channel(Config, B), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), + {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, B), Q = <<"mirror_queue_sync_order-queue">>, declare(Ch, Q, 3), publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>}, @@ -459,11 +476,54 @@ mirror_queue_sync_order(Config) -> <<"msg4">>, <<"msg1">>]), delete(Ch2, Q), + rabbit_ct_broker_helpers:start_node(Config, A), + rabbit_ct_client_helpers:close_connection(Conn), + rabbit_ct_client_helpers:close_connection(Conn2), passed. -%%---------------------------------------------------------------------------- -open(Config) -> - rabbit_ct_client_helpers:open_connection_and_channel(Config, 0). +mirror_reset_policy(Config) -> + %% Gives time to the master to go through all stages. + %% Might eventually trigger some race conditions from #802, + %% although for that I would expect a longer run and higher + %% number of messages in the system. + mirror_reset_policy(Config, 5000). + +mirror_fast_reset_policy(Config) -> + %% This test seems to trigger the bug tested in invoke/1, but it + %% cannot guarantee it will always happen. Thus, both tests + %% should stay in the test suite. + mirror_reset_policy(Config, 5). + + +mirror_reset_policy(Config, Wait) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), + Q = <<"mirror_reset_policy-queue">>, + declare(Ch, Q, 5), + Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + publish_many(Ch, Q, 20000), + [begin + rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]), + timer:sleep(Wait), + rabbit_ct_broker_helpers:clear_policy( + Config, A, <<"^mirror_reset_policy-queue$">>), + timer:sleep(Wait) + end || _ <- lists:seq(1, 10)], + timer:sleep(1000), + ok = rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]), + wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2), + %% Verify master has not crashed + Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)), + delete(Ch, Q), + + rabbit_ct_client_helpers:close_connection(Conn), + passed. + +%%---------------------------------------------------------------------------- declare(Ch, Q, Args) when is_list(Args) -> amqp_channel:call(Ch, #'queue.declare'{queue = Q, @@ -564,18 +624,26 @@ priority2bin(Int) -> list_to_binary(integer_to_list(Int)). %%---------------------------------------------------------------------------- wait_for_sync(Config, Nodename, Q) -> - case synced(Config, Nodename, Q) of + wait_for_sync(Config, Nodename, Q, 1). + +wait_for_sync(Config, Nodename, Q, Nodes) -> + wait_for_sync(Config, Nodename, Q, Nodes, 600). + +wait_for_sync(_, _, _, _, 0) -> + throw(sync_timeout); +wait_for_sync(Config, Nodename, Q, Nodes, N) -> + case synced(Config, Nodename, Q, Nodes) of true -> ok; false -> timer:sleep(100), - wait_for_sync(Config, Nodename, Q) + wait_for_sync(Config, Nodename, Q, Nodes, N-1) end. -synced(Config, Nodename, Q) -> +synced(Config, Nodename, Q, Nodes) -> Info = rabbit_ct_broker_helpers:rpc(Config, Nodename, rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]), [SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info, Q =:= Q1], - length(SSPids) =:= 1. + length(SSPids) =:= Nodes. synced_msgs(Config, Nodename, Q, Expected) -> Info = rabbit_ct_broker_helpers:rpc(Config, Nodename, @@ -593,4 +661,11 @@ slave_pids(Config, Nodename, Q) -> Q =:= Q1], SPids. +queue_pid(Config, Nodename, Q) -> + Info = rabbit_ct_broker_helpers:rpc( + Config, Nodename, + rabbit_amqqueue, info_all, [<<"/">>, [name, pid]]), + [Pid] = [P || [{name, Q1}, {pid, P}] <- Info, Q =:= Q1], + Pid. + %%---------------------------------------------------------------------------- diff --git a/test/priority_queue_recovery_SUITE.erl b/test/priority_queue_recovery_SUITE.erl new file mode 100644 index 0000000000..9e2ffbd3fe --- /dev/null +++ b/test/priority_queue_recovery_SUITE.erl @@ -0,0 +1,153 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(priority_queue_recovery_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + recovery %% Restart RabbitMQ. + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(_, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 2} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +recovery(Config) -> + {Conn, Ch} = open(Config), + Q = <<"recovery-queue">>, + declare(Ch, Q, 3), + publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), + rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection(Conn), + + rabbit_ct_broker_helpers:restart_broker(Config, 0), + + {Conn2, Ch2} = open(Config, 1), + get_all(Ch2, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]), + delete(Ch2, Q), + rabbit_ct_client_helpers:close_channel(Ch2), + rabbit_ct_client_helpers:close_connection(Conn2), + passed. + + +%%---------------------------------------------------------------------------- + +open(Config) -> + open(Config, 0). + +open(Config, NodeIndex) -> + rabbit_ct_client_helpers:open_connection_and_channel(Config, NodeIndex). + +declare(Ch, Q, Args) when is_list(Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + arguments = Args}); +declare(Ch, Q, Max) -> + declare(Ch, Q, arguments(Max)). + +delete(Ch, Q) -> + amqp_channel:call(Ch, #'queue.delete'{queue = Q}). + +publish(Ch, Q, Ps) -> + amqp_channel:call(Ch, #'confirm.select'{}), + [publish1(Ch, Q, P) || P <- Ps], + amqp_channel:wait_for_confirms(Ch). + +publish1(Ch, Q, P) -> + amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q}, + #amqp_msg{props = props(P), + payload = priority2bin(P)}). + +publish1(Ch, Q, P, Pd) -> + amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q}, + #amqp_msg{props = props(P), + payload = Pd}). + +get_all(Ch, Q, Ack, Ps) -> + DTags = get_partial(Ch, Q, Ack, Ps), + get_empty(Ch, Q), + DTags. + +get_partial(Ch, Q, Ack, Ps) -> + [get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps]. + +get_empty(Ch, Q) -> + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}). + +get_ok(Ch, Q, Ack, PBin) -> + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} = + amqp_channel:call(Ch, #'basic.get'{queue = Q, + no_ack = Ack =:= no_ack}), + PBin = PBin2, + maybe_ack(Ch, Ack, DTag). + +maybe_ack(Ch, do_ack, DTag) -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}), + DTag; +maybe_ack(_Ch, _, DTag) -> + DTag. + +arguments(none) -> []; +arguments(Max) -> [{<<"x-max-priority">>, byte, Max}]. + +priority2bin(undefined) -> <<"undefined">>; +priority2bin(Int) -> list_to_binary(integer_to_list(Int)). + +props(undefined) -> #'P_basic'{delivery_mode = 2}; +props(P) -> #'P_basic'{priority = P, + delivery_mode = 2}. |
