summaryrefslogtreecommitdiff
path: root/test/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-04-26 14:49:41 +0300
committerMichael Klishin <michael@novemberain.com>2015-04-26 14:49:41 +0300
commit2467046fb2739e5ffc0aa5a151339615326d8683 (patch)
treef1d6ac964688de8c74b2a9db798f2c6f6e33208d /test/src
parentc169604de6f02f9703bffc8b1637f02ad22d9151 (diff)
parentee0945aae3b7cffcfbf6f993d488cde08be17e7b (diff)
downloadrabbitmq-server-git-2467046fb2739e5ffc0aa5a151339615326d8683.tar.gz
Merge pull request #54 from alexethomas/sla_tracking_v2
Track and expose the timestamp property of the first msg in a queue
Diffstat (limited to 'test/src')
-rw-r--r--test/src/rabbit_tests.erl85
1 files changed, 74 insertions, 11 deletions
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 0774dd9ab5..a39aa81e79 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -13,7 +13,6 @@
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
-
-module(rabbit_tests).
-compile([export_all]).
@@ -64,7 +63,8 @@ all_tests0() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
- passed = test_statistics(),
+ passed = test_ch_statistics(),
+ passed = test_head_message_timestamp_statistic(),
passed = test_arguments_parser(),
passed = test_dynamic_mirroring(),
passed = test_user_management(),
@@ -89,7 +89,6 @@ all_tests0() ->
passed = vm_memory_monitor_tests:all_tests(),
passed.
-
do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
@@ -1440,21 +1439,21 @@ test_statistics_event_receiver(Pid) ->
Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
end.
-test_statistics_receive_event(Ch, Matcher) ->
+test_ch_statistics_receive_event(Ch, Matcher) ->
rabbit_channel:flush(Ch),
Ch ! emit_stats,
- test_statistics_receive_event1(Ch, Matcher).
+ test_ch_statistics_receive_event1(Ch, Matcher).
-test_statistics_receive_event1(Ch, Matcher) ->
+test_ch_statistics_receive_event1(Ch, Matcher) ->
receive #event{type = channel_stats, props = Props} ->
case Matcher(Props) of
true -> Props;
- _ -> test_statistics_receive_event1(Ch, Matcher)
+ _ -> test_ch_statistics_receive_event1(Ch, Matcher)
end
after ?TIMEOUT -> throw(failed_to_receive_event)
end.
-test_statistics() ->
+test_ch_statistics() ->
application:set_env(rabbit, collect_statistics, fine),
%% ATM this just tests the queue / exchange stats in channels. That's
@@ -1472,7 +1471,7 @@ test_statistics() ->
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
%% Check stats empty
- Event = test_statistics_receive_event(Ch, fun (_) -> true end),
+ Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end),
[] = proplists:get_value(channel_queue_stats, Event),
[] = proplists:get_value(channel_exchange_stats, Event),
[] = proplists:get_value(channel_queue_exchange_stats, Event),
@@ -1484,7 +1483,7 @@ test_statistics() ->
rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
%% Check the stats reflect that
- Event2 = test_statistics_receive_event(
+ Event2 = test_ch_statistics_receive_event(
Ch,
fun (E) ->
length(proplists:get_value(
@@ -1497,7 +1496,7 @@ test_statistics() ->
%% Check the stats remove stuff on queue deletion
rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
- Event3 = test_statistics_receive_event(
+ Event3 = test_ch_statistics_receive_event(
Ch,
fun (E) ->
length(proplists:get_value(
@@ -1512,6 +1511,70 @@ test_statistics() ->
rabbit_tests_event_receiver:stop(),
passed.
+test_queue_statistics_receive_event(Q, Matcher) ->
+ %% Q ! emit_stats,
+ test_queue_statistics_receive_event1(Q, Matcher).
+
+test_queue_statistics_receive_event1(Q, Matcher) ->
+ receive #event{type = queue_stats, props = Props} ->
+ case Matcher(Props) of
+ true -> Props;
+ _ -> test_queue_statistics_receive_event1(Q, Matcher)
+ end
+ after ?TIMEOUT -> throw(failed_to_receive_event)
+ end.
+
+test_head_message_timestamp_statistic() ->
+ %% Can't find a way to receive the ack here so can't test pending acks status
+
+ application:set_env(rabbit, collect_statistics, fine),
+
+ %% Set up a channel and queue
+ {_Writer, Ch} = test_spawn(),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q1#amqqueue.pid,
+
+ %% Set up event receiver for queue
+ rabbit_tests_event_receiver:start(self(), [node()], [queue_stats]),
+
+ %% Check timestamp is empty when queue is empty
+ Event1 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ '' = proplists:get_value(head_message_timestamp, Event1),
+
+ %% Publish two messages and check timestamp is that of first message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{timestamp = 1}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{timestamp = 2}, <<"">>)),
+ Event2 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ 1 = proplists:get_value(head_message_timestamp, Event2),
+
+ %% Get first message and check timestamp is that of second message
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}),
+ Event3 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ 2 = proplists:get_value(head_message_timestamp, Event3),
+
+ %% Get second message and check timestamp is empty again
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}),
+ Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ '' = proplists:get_value(head_message_timestamp, Event1),
+
+ %% Teardown
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ rabbit_channel:shutdown(Ch),
+ rabbit_tests_event_receiver:stop(),
+
+ passed.
+
test_refresh_events(SecondaryNode) ->
rabbit_tests_event_receiver:start(self(), [node(), SecondaryNode],
[channel_created, queue_created]),