summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-28 10:21:57 +0100
committerDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-28 10:21:57 +0100
commit16851f9ea5f727e6bf219def5816e394a8b7819c (patch)
tree82ab27d5f6971e0561a4fbeac3f760dab3b6f487
parentc824c9dd19341cdd0e8a48ff02d846abf9c9c313 (diff)
parentdd25bf9445eccdf148dfbd9fdfe59180b1f6b1d4 (diff)
downloadrabbitmq-server-git-16851f9ea5f727e6bf219def5816e394a8b7819c.tar.gz
Merge branch 'stable'
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_priority_queue.erl7
-rw-r--r--test/priority_queue_SUITE.erl227
-rw-r--r--test/priority_queue_recovery_SUITE.erl153
4 files changed, 310 insertions, 79 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9edb99c4d7..c04c82f45e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -120,7 +120,7 @@ handle_go(Q = #amqqueue{name = QName}) ->
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
Q1 = Q #amqqueue { pid = QPid },
- ok = rabbit_queue_index:erase(QName), %% For crash recovery
+ _ = BQ:delete_crashed(Q), %% For crash recovery
BQS = bq_init(BQ, Q1, new),
State = #state { q = Q1,
gm = GM,
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index ae8a38daf0..b7a3afd129 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -43,7 +43,7 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
--record(state, {bq, bqss}).
+-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
%% See 'note on suffixes' below
@@ -157,7 +157,8 @@ init(Q, Recover, AsyncCallback) ->
[{P, Init(P, Term)} || {P, Term} <- PsTerms]
end,
#state{bq = BQ,
- bqss = BQSs}
+ bqss = BQSs,
+ max_priority = hd(Ps)}
end.
%% [0] collapse_recovery has the effect of making a list of recovery
%% terms in priority order, even for non priority queues. It's easier
@@ -419,6 +420,8 @@ info(Item, #passthrough{bq = BQ, bqs = BQS}) ->
invoke(Mod, {P, Fun}, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
+invoke(Mod, Fun, State = #state{bq = BQ, max_priority = P}) ->
+ pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(invoke(Mod, Fun, BQS)).
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 56b44d423e..46fafd89f7 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -29,32 +29,28 @@ all() ->
groups() ->
[
- {cluster_size_2, [], [
- {parallel_tests, [parallel], [
- ackfold,
- drop,
- dropwhile_fetchwhile,
- info_head_message_timestamp,
- matching,
- mirror_queue_sync,
- mirror_queue_sync_priority_above_max,
- mirror_queue_sync_priority_above_max_pending_ack,
- mirror_queue_sync_order,
- purge,
- requeue,
- resume,
- simple_order,
- straight_through
- ]},
- {non_parallel_tests, [], [
- recovery %% Restart RabbitMQ.
- ]}
- ]},
- {cluster_size_3, [], [
- {parallel_tests, [parallel], [
- mirror_queue_auto_ack
- ]}
- ]}
+ {cluster_size_2, [], [
+ ackfold,
+ drop,
+ dropwhile_fetchwhile,
+ info_head_message_timestamp,
+ matching,
+ mirror_queue_sync,
+ mirror_queue_sync_priority_above_max,
+ mirror_queue_sync_priority_above_max_pending_ack,
+ mirror_queue_sync_order,
+ purge,
+ requeue,
+ resume,
+ simple_order,
+ straight_through,
+ invoke
+ ]},
+ {cluster_size_3, [], [
+ mirror_queue_auto_ack,
+ mirror_fast_reset_policy,
+ mirror_reset_policy
+ ]}
].
%% -------------------------------------------------------------------
@@ -69,35 +65,35 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(cluster_size_2, Config) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodes_count, 2}
+ {rmq_nodes_count, 2},
+ {rmq_nodename_suffix, Suffix}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
init_per_group(cluster_size_3, Config) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodes_count, 3}
+ {rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Suffix}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps());
-init_per_group(_, Config) ->
- Config.
+ rabbit_ct_client_helpers:setup_steps()).
-end_per_group(ClusterSizeGroup, Config)
-when ClusterSizeGroup =:= cluster_size_2
-orelse ClusterSizeGroup =:= cluster_size_3 ->
+end_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
- rabbit_ct_broker_helpers:teardown_steps());
-end_per_group(_, Config) ->
- Config.
+ rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
+ rabbit_ct_client_helpers:setup_steps(),
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
+ rabbit_ct_client_helpers:teardown_steps(),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
%% -------------------------------------------------------------------
@@ -135,25 +131,8 @@ end_per_testcase(Testcase, Config) ->
%%
%% [0] publish enough to get credit flow from msg store
-recovery(Config) ->
- {Conn, Ch} = open(Config),
- Q = <<"recovery-queue">>,
- declare(Ch, Q, 3),
- publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
- amqp_connection:close(Conn),
-
- %% TODO This terminates the automatically open connection and breaks
- %% coverage.
- rabbit_ct_broker_helpers:restart_broker(Config, 0),
-
- {Conn2, Ch2} = open(Config),
- get_all(Ch2, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
- delete(Ch2, Q),
- amqp_connection:close(Conn2),
- passed.
-
simple_order(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"simple_order-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
@@ -164,10 +143,11 @@ simple_order(Config) ->
get_all(Ch, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
matching(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"matching-queue">>,
declare(Ch, Q, 5),
%% We round priority down, and 0 is the default
@@ -175,10 +155,11 @@ matching(Config) ->
get_all(Ch, Q, do_ack, [5, 10, undefined, 0, undefined]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
resume(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"resume-queue">>,
declare(Ch, Q, 5),
amqp_channel:call(Ch, #'confirm.select'{}),
@@ -187,10 +168,11 @@ resume(Config) ->
amqp_channel:call(Ch, #'queue.purge'{queue = Q}), %% Assert it exists
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
straight_through(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"straight_through-queue">>,
declare(Ch, Q, 3),
[begin
@@ -204,10 +186,35 @@ straight_through(Config) ->
get_empty(Ch, Q),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
+invoke(Config) ->
+ %% Synthetic test to check the invoke callback, as the bug tested here
+ %% is only triggered with a race condition.
+ %% When mirroring is stopped, the backing queue of rabbit_amqqueue_process
+ %% changes from rabbit_mirror_queue_master to rabbit_priority_queue,
+ %% which shouldn't receive any invoke call. However, there might
+ %% be pending messages so the priority queue receives the
+ %% `run_backing_queue` cast message sent to the old master.
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
+ Q = <<"invoke-queue">>,
+ declare(Ch, Q, 3),
+ Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ rabbit_ct_broker_helpers:rpc(
+ Config, A, gen_server, cast,
+ [Pid,
+ {run_backing_queue, ?MODULE, fun(_, _) -> ok end}]),
+ Pid2 = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ Pid = Pid2,
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
dropwhile_fetchwhile(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"dropwhile_fetchwhile-queue">>,
[begin
declare(Ch, Q, Args ++ arguments(3)),
@@ -221,10 +228,11 @@ dropwhile_fetchwhile(Config) ->
{<<"x-dead-letter-exchange">>, longstr, <<"amq.fanout">>}]
]],
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
ackfold(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"ackfolq-queue1">>,
Q2 = <<"ackfold-queue2">>,
declare(Ch, Q,
@@ -242,10 +250,11 @@ ackfold(Config) ->
delete(Ch, Q),
delete(Ch, Q2),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
requeue(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"requeue-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
@@ -256,10 +265,11 @@ requeue(Config) ->
get_all(Ch, Q, do_ack, [3, 2, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
drop(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"drop-queue">>,
declare(Ch, Q, [{<<"x-max-length">>, long, 4} | arguments(3)]),
publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
@@ -268,10 +278,11 @@ drop(Config) ->
get_all(Ch, Q, do_ack, [2, 1, 1, 1]),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
purge(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"purge-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
@@ -279,6 +290,7 @@ purge(Config) ->
get_empty(Ch, Q),
delete(Ch, Q),
rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
info_head_message_timestamp(Config) ->
@@ -349,7 +361,7 @@ ram_duration(_Config) ->
passed.
mirror_queue_sync(Config) ->
- Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"mirror_queue_sync-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
@@ -363,13 +375,14 @@ mirror_queue_sync(Config) ->
rabbit_ct_broker_helpers:control_action(sync_queue, Nodename0,
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, Nodename0, rabbit_misc:r(<<"/">>, queue, Q)),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_sync_priority_above_max(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
%% Tests synchronisation of slaves when priority is higher than max priority.
%% This causes an infinity loop (and test timeout) before rabbitmq-server-795
- Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_queue_sync_priority_above_max-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [5, 5, 5]),
@@ -379,6 +392,7 @@ mirror_queue_sync_priority_above_max(Config) ->
[binary_to_list(Q)], [{"-p", "/"}]),
wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
delete(Ch, Q),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_sync_priority_above_max_pending_ack(Config) ->
@@ -386,7 +400,7 @@ mirror_queue_sync_priority_above_max_pending_ack(Config) ->
%% Tests synchronisation of slaves when priority is higher than max priority
%% and there are pending acks.
%% This causes an infinity loop (and test timeout) before rabbitmq-server-795
- Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_queue_sync_priority_above_max_pending_ack-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [5, 5, 5]),
@@ -401,6 +415,7 @@ mirror_queue_sync_priority_above_max_pending_ack(Config) ->
synced_msgs(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 3),
synced_msgs(Config, B, rabbit_misc:r(<<"/">>, queue, Q), 3),
delete(Ch, Q),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_auto_ack(Config) ->
@@ -410,7 +425,7 @@ mirror_queue_auto_ack(Config) ->
%% the slaves will crash with the depth notification as they will not
%% match the master delta.
%% Bug rabbitmq-server 687
- Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Q = <<"mirror_queue_auto_ack-queue">>,
declare(Ch, Q, 3),
publish(Ch, Q, [1, 2, 3]),
@@ -431,13 +446,15 @@ mirror_queue_auto_ack(Config) ->
SPid2 = proplists:get_value(SNode2, Slaves),
delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
passed.
mirror_queue_sync_order(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
- Ch = rabbit_ct_client_helpers:open_channel(Config, A),
- Ch2 = rabbit_ct_client_helpers:open_channel(Config, B),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
+ {Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, B),
Q = <<"mirror_queue_sync_order-queue">>,
declare(Ch, Q, 3),
publish_payload(Ch, Q, [{1, <<"msg1">>}, {2, <<"msg2">>},
@@ -459,11 +476,54 @@ mirror_queue_sync_order(Config) ->
<<"msg4">>, <<"msg1">>]),
delete(Ch2, Q),
+ rabbit_ct_broker_helpers:start_node(Config, A),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ rabbit_ct_client_helpers:close_connection(Conn2),
passed.
-%%----------------------------------------------------------------------------
-open(Config) ->
- rabbit_ct_client_helpers:open_connection_and_channel(Config, 0).
+mirror_reset_policy(Config) ->
+ %% Gives time to the master to go through all stages.
+ %% Might eventually trigger some race conditions from #802,
+ %% although for that I would expect a longer run and higher
+ %% number of messages in the system.
+ mirror_reset_policy(Config, 5000).
+
+mirror_fast_reset_policy(Config) ->
+ %% This test seems to trigger the bug tested in invoke/1, but it
+ %% cannot guarantee it will always happen. Thus, both tests
+ %% should stay in the test suite.
+ mirror_reset_policy(Config, 5).
+
+
+mirror_reset_policy(Config, Wait) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
+ Q = <<"mirror_reset_policy-queue">>,
+ declare(Ch, Q, 5),
+ Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ publish_many(Ch, Q, 20000),
+ [begin
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ timer:sleep(Wait),
+ rabbit_ct_broker_helpers:clear_policy(
+ Config, A, <<"^mirror_reset_policy-queue$">>),
+ timer:sleep(Wait)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(1000),
+ ok = rabbit_ct_broker_helpers:set_ha_policy(
+ Config, A, <<"^mirror_reset_policy-queue$">>, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
+ %% Verify master has not crashed
+ Pid = queue_pid(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ delete(Ch, Q),
+
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
+%%----------------------------------------------------------------------------
declare(Ch, Q, Args) when is_list(Args) ->
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
@@ -564,18 +624,26 @@ priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
%%----------------------------------------------------------------------------
wait_for_sync(Config, Nodename, Q) ->
- case synced(Config, Nodename, Q) of
+ wait_for_sync(Config, Nodename, Q, 1).
+
+wait_for_sync(Config, Nodename, Q, Nodes) ->
+ wait_for_sync(Config, Nodename, Q, Nodes, 600).
+
+wait_for_sync(_, _, _, _, 0) ->
+ throw(sync_timeout);
+wait_for_sync(Config, Nodename, Q, Nodes, N) ->
+ case synced(Config, Nodename, Q, Nodes) of
true -> ok;
false -> timer:sleep(100),
- wait_for_sync(Config, Nodename, Q)
+ wait_for_sync(Config, Nodename, Q, Nodes, N-1)
end.
-synced(Config, Nodename, Q) ->
+synced(Config, Nodename, Q, Nodes) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]),
[SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info,
Q =:= Q1],
- length(SSPids) =:= 1.
+ length(SSPids) =:= Nodes.
synced_msgs(Config, Nodename, Q, Expected) ->
Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
@@ -593,4 +661,11 @@ slave_pids(Config, Nodename, Q) ->
Q =:= Q1],
SPids.
+queue_pid(Config, Nodename, Q) ->
+ Info = rabbit_ct_broker_helpers:rpc(
+ Config, Nodename,
+ rabbit_amqqueue, info_all, [<<"/">>, [name, pid]]),
+ [Pid] = [P || [{name, Q1}, {pid, P}] <- Info, Q =:= Q1],
+ Pid.
+
%%----------------------------------------------------------------------------
diff --git a/test/priority_queue_recovery_SUITE.erl b/test/priority_queue_recovery_SUITE.erl
new file mode 100644
index 0000000000..9e2ffbd3fe
--- /dev/null
+++ b/test/priority_queue_recovery_SUITE.erl
@@ -0,0 +1,153 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(priority_queue_recovery_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ recovery %% Restart RabbitMQ.
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+recovery(Config) ->
+ {Conn, Ch} = open(Config),
+ Q = <<"recovery-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+
+ rabbit_ct_broker_helpers:restart_broker(Config, 0),
+
+ {Conn2, Ch2} = open(Config, 1),
+ get_all(Ch2, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
+ delete(Ch2, Q),
+ rabbit_ct_client_helpers:close_channel(Ch2),
+ rabbit_ct_client_helpers:close_connection(Conn2),
+ passed.
+
+
+%%----------------------------------------------------------------------------
+
+open(Config) ->
+ open(Config, 0).
+
+open(Config, NodeIndex) ->
+ rabbit_ct_client_helpers:open_connection_and_channel(Config, NodeIndex).
+
+declare(Ch, Q, Args) when is_list(Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ arguments = Args});
+declare(Ch, Q, Max) ->
+ declare(Ch, Q, arguments(Max)).
+
+delete(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
+publish(Ch, Q, Ps) ->
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ [publish1(Ch, Q, P) || P <- Ps],
+ amqp_channel:wait_for_confirms(Ch).
+
+publish1(Ch, Q, P) ->
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = props(P),
+ payload = priority2bin(P)}).
+
+publish1(Ch, Q, P, Pd) ->
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = props(P),
+ payload = Pd}).
+
+get_all(Ch, Q, Ack, Ps) ->
+ DTags = get_partial(Ch, Q, Ack, Ps),
+ get_empty(Ch, Q),
+ DTags.
+
+get_partial(Ch, Q, Ack, Ps) ->
+ [get_ok(Ch, Q, Ack, priority2bin(P)) || P <- Ps].
+
+get_empty(Ch, Q) ->
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
+
+get_ok(Ch, Q, Ack, PBin) ->
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = Q,
+ no_ack = Ack =:= no_ack}),
+ PBin = PBin2,
+ maybe_ack(Ch, Ack, DTag).
+
+maybe_ack(Ch, do_ack, DTag) ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
+ DTag;
+maybe_ack(_Ch, _, DTag) ->
+ DTag.
+
+arguments(none) -> [];
+arguments(Max) -> [{<<"x-max-priority">>, byte, Max}].
+
+priority2bin(undefined) -> <<"undefined">>;
+priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
+
+props(undefined) -> #'P_basic'{delivery_mode = 2};
+props(P) -> #'P_basic'{priority = P,
+ delivery_mode = 2}.