summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2017-11-11 22:45:08 +0000
committerMichael Klishin <michael@clojurewerkz.org>2017-12-05 13:30:25 -0800
commitce6c81257753117c8c8653984a7c41bd29546ca4 (patch)
tree8822a77d1f78adc4499277845f0a770a671baa83 /test
parent9fd6620a58e3c51eeb6d2c3369242fbb2823bb2e (diff)
downloadrabbitmq-server-git-ce6c81257753117c8c8653984a7c41bd29546ca4.tar.gz
Add test for the application of message-ttl policy retroactively
Diffstat (limited to 'test')
-rw-r--r--test/policy_SUITE.erl57
1 files changed, 56 insertions, 1 deletions
diff --git a/test/policy_SUITE.erl b/test/policy_SUITE.erl
index 634f198858..2c41433a30 100644
--- a/test/policy_SUITE.erl
+++ b/test/policy_SUITE.erl
@@ -30,7 +30,9 @@ groups() ->
[
{cluster_size_2, [], [
policy_ttl,
- operator_policy_ttl
+ operator_policy_ttl,
+ operator_retroactive_policy_ttl,
+ operator_retroactive_policy_publish_ttl
]}
].
@@ -112,6 +114,49 @@ operator_policy_ttl(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
+operator_retroactive_policy_ttl(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Q = <<"policy_ttl-queue">>,
+ declare(Ch, Q),
+ publish(Ch, Q, lists:seq(1, 50)),
+ % Operator policy will override
+ rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"ttl-policy-op">>,
+ <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 1}]),
+
+ %% Old messages are not expired
+ timer:sleep(50),
+ get_messages(50, Ch, Q),
+ delete(Ch, Q),
+
+ rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"ttl-policy-op">>),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
+operator_retroactive_policy_publish_ttl(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Q = <<"policy_ttl-queue">>,
+ declare(Ch, Q),
+ publish(Ch, Q, lists:seq(1, 50)),
+ % Operator policy will override
+ rabbit_ct_broker_helpers:set_operator_policy(Config, 0, <<"ttl-policy-op">>,
+ <<"policy_ttl-queue">>, <<"all">>, [{<<"message-ttl">>, 1}]),
+
+ %% Old messages are not expired, new ones only expire when they get to the head of
+ %% the queue
+ publish(Ch, Q, lists:seq(1, 25)),
+ timer:sleep(50),
+ [[<<"policy_ttl-queue">>, <<"75">>]] = rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues"]),
+ get_messages(50, Ch, Q),
+ delete(Ch, Q),
+
+ rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"ttl-policy-op">>),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
%%----------------------------------------------------------------------------
@@ -154,4 +199,14 @@ consume(Ch, Q, Ack) ->
get_empty(Ch, Q) ->
#'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
+get_messages(0, Ch, Q) ->
+ get_empty(Ch, Q);
+get_messages(Number, Ch, Q) ->
+ case amqp_channel:call(Ch, #'basic.get'{queue = Q}) of
+ {#'basic.get_ok'{}, _} ->
+ get_messages(Number - 1, Ch, Q);
+ #'basic.get_empty'{} ->
+ exit(failed)
+ end.
+
%%----------------------------------------------------------------------------