summaryrefslogtreecommitdiff
path: root/test/queue_parallel_SUITE.erl
blob: bbefa625bc25e4d60855c1b9a4d310fd42ec28a9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%
%%
-module(queue_parallel_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile(export_all).

-define(TIMEOUT, 30000).

-import(quorum_queue_utils, [wait_for_messages/2]).

all() ->
    [
     {group, parallel_tests}
    ].

groups() ->
    AllTests = [publish,
                consume,
                consume_first_empty,
                consume_from_empty_queue,
                consume_and_autoack,
                subscribe,
                subscribe_with_autoack,
                consume_and_ack,
                consume_and_multiple_ack,
                subscribe_and_ack,
                subscribe_and_multiple_ack,
                subscribe_and_requeue_multiple_nack,
                subscribe_and_nack,
                subscribe_and_requeue_nack,
                subscribe_and_multiple_nack,
                consume_and_requeue_nack,
                consume_and_nack,
                consume_and_requeue_multiple_nack,
                consume_and_multiple_nack,
                basic_cancel,
                purge,
                basic_recover,
                delete_immediately_by_resource
               ],
    [
     {parallel_tests, [],
      [
       {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
                                                trigger_message_store_compaction]},
       {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds,
                                                 trigger_message_store_compaction]},
       {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
       {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
       {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
      ]}
    ].

suite() ->
    [
      {timetrap, {minutes, 3}}
    ].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(classic_queue, Config) ->
    rabbit_ct_helpers:set_config(
      Config,
      [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
       {queue_durable, true}]);
init_per_group(quorum_queue, Config) ->
    case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
        ok ->
            rabbit_ct_helpers:set_config(
              Config,
              [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
               {queue_durable, true}]);
        Skip ->
            Skip
    end;
init_per_group(quorum_queue_in_memory_limit, Config) ->
    case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
        ok ->
            rabbit_ct_helpers:set_config(
              Config,
              [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>},
                             {<<"x-max-in-memory-length">>, long, 1}]},
               {queue_durable, true}]);
        Skip ->
            Skip
    end;
init_per_group(quorum_queue_in_memory_bytes, Config) ->
    case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
        ok ->
            rabbit_ct_helpers:set_config(
              Config,
              [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>},
                             {<<"x-max-in-memory-bytes">>, long, 1}]},
               {queue_durable, true}]);
        Skip ->
            Skip
    end;
init_per_group(mirrored_queue, Config) ->
    rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
        <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
    Config1 = rabbit_ct_helpers:set_config(
                Config, [{is_mirrored, true},
                         {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
                         {queue_durable, true}]),
    rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(stream_queue, Config) ->
    case rabbit_ct_broker_helpers:enable_feature_flag(Config, stream_queue) of
        ok ->
            rabbit_ct_helpers:set_config(
              Config,
              [{queue_args, [{<<"x-queue-type">>, longstr, <<"stream">>}]},
               {queue_durable, true}]);
        Skip ->
            Skip
    end;
init_per_group(Group, Config0) ->
    case lists:member({group, Group}, all()) of
        true ->
            ClusterSize = 3,
            Config = rabbit_ct_helpers:merge_app_env(
                       Config0, {rabbit, [{channel_tick_interval, 1000},
                                          {quorum_tick_interval, 1000},
                                          {stream_tick_interval, 1000}]}),
            Config1 = rabbit_ct_helpers:set_config(
                        Config, [ {rmq_nodename_suffix, Group},
                                  {rmq_nodes_count, ClusterSize}
                                ]),
            rabbit_ct_helpers:run_steps(Config1,
                                        rabbit_ct_broker_helpers:setup_steps() ++
                                        rabbit_ct_client_helpers:setup_steps());
        false ->
            rabbit_ct_helpers:run_steps(Config0, [])
    end.

end_per_group(Group, Config) ->
    case lists:member({group, Group}, all()) of
        true ->
            rabbit_ct_helpers:run_steps(Config,
              rabbit_ct_client_helpers:teardown_steps() ++
              rabbit_ct_broker_helpers:teardown_steps());
        false ->
            Config
    end.

init_per_testcase(Testcase, Config) ->
    Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
    Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
    Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
    Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
                                                    {queue_name_2, Q2}]),
    rabbit_ct_helpers:testcase_started(Config1, Testcase).

end_per_testcase(Testcase, Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
    amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
    rabbit_ct_helpers:testcase_finished(Config, Testcase).

publish(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),
    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).

consume(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),
    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    consume(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).

consume_first_empty(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),
    consume_empty(Ch, QName),
    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    consume(Ch, QName, true, [<<"msg1">>]),
    rabbit_ct_client_helpers:close_channel(Ch).

consume_from_empty_queue(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),
    consume_empty(Ch, QName).

consume_and_autoack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),
    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    consume(Ch, QName, true, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).

subscribe(Config) ->
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    ?assertMatch(#'basic.qos_ok'{},
                 amqp_channel:call(Ch, #'basic.qos'{global = false,
                                                    prefetch_count = 10})),
    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),

    subscribe(Ch, QName, false),
    receive_basic_deliver(false),
    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),

    %% validate we can retrieve the consumers
    Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
    [Consumer] = lists:filter(fun(Props) ->
                                      Resource = proplists:get_value(queue_name, Props),
                                      QName == Resource#resource.name
                              end, Consumers),
    ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
    ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
    ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
    ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
    ?assertEqual([], proplists:get_value(arguments, Consumer)),

    rabbit_ct_client_helpers:close_channel(Ch),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).

subscribe_with_autoack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
    wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
    subscribe(Ch, QName, true),
    receive_basic_deliver(false),
    receive_basic_deliver(false),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).

consume_and_ack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
    amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

consume_and_multiple_ack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
    amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
                                       multiple     = true}),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

subscribe_and_ack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    subscribe(Ch, QName, false),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
            wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
            wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

subscribe_and_multiple_ack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    subscribe(Ch, QName, false),
    receive_basic_deliver(false),
    receive_basic_deliver(false),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
            wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
                                               multiple     = true}),
            wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

trigger_message_store_compaction(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    N = 12000,
    [publish(Ch, QName, [binary:copy(<<"a">>, 5000)]) || _ <- lists:seq(1, N)],
    wait_for_messages(Config, [[QName, <<"12000">>, <<"12000">>, <<"0">>]]),

    AllDTags = rabbit_ct_client_helpers:consume_without_acknowledging(Ch, QName, N),
    ToAck = lists:filter(fun (I) -> I > 500 andalso I < 11200 end, AllDTags),

    [amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = Tag,
                                        multiple     = false}) || Tag <- ToAck],

    %% give compaction a moment to start in and finish
    timer:sleep(5000),
    amqp_channel:cast(Ch, #'queue.purge'{queue = QName}),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

subscribe_and_requeue_multiple_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    subscribe(Ch, QName, false),
    receive_basic_deliver(false),
    receive_basic_deliver(false),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag,
                          redelivered  = false}, _} ->
            wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
            amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                                multiple     = true,
                                                requeue      = true}),
            receive_basic_deliver(true),
            receive_basic_deliver(true),
            receive
                {#'basic.deliver'{delivery_tag = DeliveryTag1,
                                  redelivered  = true}, _} ->
                    wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
                    amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
                                                       multiple     = true}),
                    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
            end
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

consume_and_requeue_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
    wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
    [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
    amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                        multiple     = false,
                                        requeue      = true}),
    wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

consume_and_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
    amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                        multiple     = false,
                                        requeue      = false}),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

consume_and_requeue_multiple_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
    amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                        multiple     = true,
                                        requeue      = true}),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

consume_and_multiple_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
    amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                        multiple     = true,
                                        requeue      = false}),
    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

subscribe_and_requeue_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    subscribe(Ch, QName, false),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag,
                          redelivered  = false}, _} ->
            wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
            amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                                multiple     = false,
                                                requeue      = true}),
            receive
                {#'basic.deliver'{delivery_tag = DeliveryTag1,
                                  redelivered  = true}, _} ->
                    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
                    amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
                    wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
            end
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

subscribe_and_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    subscribe(Ch, QName, false),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag,
                          redelivered  = false}, _} ->
            wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
            amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                                multiple     = false,
                                                requeue      = false}),
            wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

subscribe_and_multiple_nack(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
    wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
    subscribe(Ch, QName, false),
    receive_basic_deliver(false),
    receive_basic_deliver(false),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag,
                          redelivered  = false}, _} ->
            wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
            amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
                                                multiple     = true,
                                                requeue      = false}),
            wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

%% TODO test with single active
basic_cancel(Config) ->
    [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    CTag = atom_to_binary(?FUNCTION_NAME, utf8),

    %% Let's set consumer prefetch so it works with stream queues
    ?assertMatch(#'basic.qos_ok'{},
                 amqp_channel:call(Ch, #'basic.qos'{global = false,
                                                    prefetch_count = 1})),
    subscribe(Ch, QName, false, CTag),
    receive
        {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
            wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
            amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
            Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
            wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
            ?assertEqual([], lists:filter(fun(Props) ->
                                                  Resource = proplists:get_value(queue_name, Props),
                                                  QName == Resource#resource.name
                                          end, Consumers)),
            publish(Ch, QName, [<<"msg2">>, <<"msg3">>]),
            wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]),
            amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
            wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
    after 5000 ->
              exit(basic_deliver_timeout)
    end,
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

purge(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
    wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
    [_] = consume(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
    {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

basic_recover(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    publish(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    [_] = consume(Ch, QName, [<<"msg1">>]),
    wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
    amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
    wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

delete_immediately_by_pid_fails(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    Args = ?config(queue_args, Config),
    Durable = ?config(queue_durable, Config),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
    {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
    ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),

    ?assertEqual({'queue.declare_ok', QName, 0, 0},
                 amqp_channel:call(Ch, #'queue.declare'{queue     = QName,
                                                        durable   = Durable,
                                                        passive   = true,
                                                        auto_delete = false,
                                                        arguments = Args})),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

delete_immediately_by_pid_succeeds(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    Args = ?config(queue_args, Config),
    Durable = ?config(queue_durable, Config),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
    {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
    ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])),

    ?assertExit(
       {{shutdown, {server_initiated_close, 404, _}}, _},
       amqp_channel:call(Ch, #'queue.declare'{queue     = QName,
                                              durable   = Durable,
                                              passive   = true,
                                              auto_delete = false,
                                              arguments = Args})),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

delete_immediately_by_resource(Config) ->
    {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
    Args = ?config(queue_args, Config),
    Durable = ?config(queue_durable, Config),
    QName = ?config(queue_name, Config),
    declare_queue(Ch, Config, QName),

    Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."],
    ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)),

    ?assertExit(
       {{shutdown, {server_initiated_close, 404, _}}, _},
       amqp_channel:call(Ch, #'queue.declare'{queue     = QName,
                                              durable   = Durable,
                                              passive   = true,
                                              auto_delete = false,
                                              arguments = Args})),
    rabbit_ct_client_helpers:close_channel(Ch),
    ok.

%%%%%%%%%%%%%%%%%%%%%%%%
%% Test helpers
%%%%%%%%%%%%%%%%%%%%%%%%
declare_queue(Ch, Config, QName) ->
    Args = ?config(queue_args, Config),
    Durable = ?config(queue_durable, Config),
    #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
                                                                   arguments = Args,
                                                                   durable = Durable}).

publish(Ch, QName, Payloads) ->
    [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
     || Payload <- Payloads].

consume(Ch, QName, Payloads) ->
    consume(Ch, QName, false, Payloads).

consume(Ch, QName, NoAck, Payloads) ->
    [begin
         {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
             amqp_channel:call(Ch, #'basic.get'{queue = QName,
                                                no_ack = NoAck}),
         DTag
     end || Payload <- Payloads].

consume_empty(Ch, QName) ->
    ?assertMatch(#'basic.get_empty'{},
                 amqp_channel:call(Ch, #'basic.get'{queue = QName})).

subscribe(Ch, Queue, NoAck) ->
    subscribe(Ch, Queue, NoAck, <<"ctag">>).

subscribe(Ch, Queue, NoAck, Ctag) ->
    amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
                                                no_ack = NoAck,
                                                consumer_tag = Ctag},
                           self()),
    receive
        #'basic.consume_ok'{consumer_tag = Ctag} ->
             ok
    end.

receive_basic_deliver(Redelivered) ->
    receive
        {#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
            ok
    end.

flush(T) ->
    receive X ->
                ct:pal("flushed ~w", [X]),
                flush(T)
    after T ->
              ok
    end.