diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-10-12 20:01:11 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-10-12 20:01:11 +0300 |
| commit | 44a0ddb72dc0337235bcecf63878658bac4288a4 (patch) | |
| tree | 0754e646b74ac3290836fe391f446e98904e64f3 /test | |
| parent | 63cc2bb9391a34901de50b092f59045b1daf9289 (diff) | |
| parent | dc72935607e5bf6c563556139ea6992899b8520a (diff) | |
| download | rabbitmq-server-git-44a0ddb72dc0337235bcecf63878658bac4288a4.tar.gz | |
Merge pull request #344 from rabbitmq/rabbitmq-server-336
Implements Mirror Queue Sync in Batches
Diffstat (limited to 'test')
| -rw-r--r-- | test/src/rabbit_tests.erl | 67 |
1 files changed, 66 insertions, 1 deletions
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index c056c280fa..7fbfea2a4b 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -2557,6 +2557,57 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> false, self(), noflow, VQN) end, VQ, lists:seq(Start, Start + Count - 1))). +variable_queue_batch_publish(IsPersistent, Count, VQ) -> + variable_queue_batch_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_batch_publish(IsPersistent, Count, PropFun, VQ) -> + variable_queue_batch_publish(IsPersistent, 1, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_batch_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> + variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, + PayloadFun, fun make_publish/4, VQ). + +variable_queue_batch_publish_delivered(IsPersistent, Count, VQ) -> + variable_queue_batch_publish_delivered(IsPersistent, Count, fun (_N, P) -> P end, VQ). + +variable_queue_batch_publish_delivered(IsPersistent, Count, PropFun, VQ) -> + variable_queue_batch_publish_delivered(IsPersistent, 1, Count, PropFun, + fun (_N) -> <<>> end, VQ). + +variable_queue_batch_publish_delivered(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> + variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, + PayloadFun, fun make_publish_delivered/4, VQ). + +variable_queue_batch_publish0(IsPersistent, Start, Count, PropFun, PayloadFun, + MakePubFun, VQ) -> + Publishes = + [MakePubFun(IsPersistent, PayloadFun, PropFun, N) + || N <- lists:seq(Start, Start + Count - 1)], + VQ1 = rabbit_variable_queue:batch_publish(Publishes, self(), noflow, VQ), + variable_queue_wait_for_shuffling_end(VQ1). + +make_publish(IsPersistent, PayloadFun, PropFun, N) -> + {rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{size = 10}), + false}. + +make_publish_delivered(IsPersistent, PayloadFun, PropFun, N) -> + {rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{size = 10})}. + variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, @@ -2652,9 +2703,23 @@ test_variable_queue() -> fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, fun test_variable_queue_requeue_ram_beta/1, - fun test_variable_queue_fold/1]], + fun test_variable_queue_fold/1, + fun test_variable_queue_batch_publish/1, + fun test_variable_queue_batch_publish_delivered/1]], passed. +test_variable_queue_batch_publish(VQ) -> + Count = 10, + VQ1 = variable_queue_batch_publish(true, Count, VQ), + Count = rabbit_variable_queue:len(VQ1), + VQ1. + +test_variable_queue_batch_publish_delivered(VQ) -> + Count = 10, + VQ1 = variable_queue_batch_publish_delivered(true, Count, VQ), + Count = rabbit_variable_queue:len(VQ1), + VQ1. + test_variable_queue_fold(VQ0) -> {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), |
