summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-19 15:27:04 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-19 15:27:04 +0100
commit938bb6e88a9538d109448362006c03ef1338af24 (patch)
tree08985e61408dcdcac2526263f51c84b2ee51d2bc /src
parente3ecb0f03bf0a0ecaa5cd8137339c3f9eecabb94 (diff)
downloadrabbitmq-server-git-938bb6e88a9538d109448362006c03ef1338af24.tar.gz
refactor msg_store starting/stopping in tests
using the new funs in vq
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl82
1 files changed, 27 insertions, 55 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 34ff55feca..54a2cc8cbf 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1354,28 +1354,10 @@ test_backing_queue() ->
passed
end.
-start_msg_store_empty() ->
- start_msg_store(undefined, {fun (ok) -> finished end, ok}).
-
-start_msg_store(ClientRefs, StartupFunState) ->
- ok = rabbit_sup:start_child(
- ?PERSISTENT_MSG_STORE, rabbit_msg_store,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), ClientRefs,
- StartupFunState]),
- ok = rabbit_sup:start_child(
- ?TRANSIENT_MSG_STORE, rabbit_msg_store,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), undefined,
- {fun (ok) -> finished end, ok}]).
-
-stop_msg_store() ->
- case supervisor:terminate_child(rabbit_sup, ?PERSISTENT_MSG_STORE) of
- ok -> supervisor:delete_child(rabbit_sup, ?PERSISTENT_MSG_STORE),
- case supervisor:terminate_child(rabbit_sup, ?TRANSIENT_MSG_STORE) of
- ok -> supervisor:delete_child(rabbit_sup, ?TRANSIENT_MSG_STORE);
- E -> {transient, E}
- end;
- E -> {persistent, E}
- end.
+restart_msg_store_empty() ->
+ ok = rabbit_variable_queue:stop_msg_store(),
+ ok = rabbit_variable_queue:start_msg_store(
+ undefined, {fun (ok) -> finished end, ok}).
guid_bin(X) ->
erlang:md5(term_to_binary(X)).
@@ -1422,8 +1404,7 @@ foreach_with_msg_store_client(Store, Ref, Fun, L) ->
rabbit_msg_store:client_init(Store, Ref), L)).
test_msg_store() ->
- stop_msg_store(),
- ok = start_msg_store_empty(),
+ restart_msg_store_empty(),
Self = self(),
Guids = [guid_bin(M) || M <- lists:seq(1,100)],
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
@@ -1485,22 +1466,22 @@ test_msg_store() ->
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% stop and restart, preserving every other msg in 2nd half
- ok = stop_msg_store(),
- ok = start_msg_store([], {fun ([]) -> finished;
- ([Guid|GuidsTail])
- when length(GuidsTail) rem 2 == 0 ->
- {Guid, 1, GuidsTail};
- ([Guid|GuidsTail]) ->
- {Guid, 0, GuidsTail}
- end, Guids2ndHalf}),
+ ok = rabbit_variable_queue:stop_msg_store(),
+ ok = rabbit_variable_queue:start_msg_store(
+ [], {fun ([]) -> finished;
+ ([Guid|GuidsTail])
+ when length(GuidsTail) rem 2 == 0 ->
+ {Guid, 1, GuidsTail};
+ ([Guid|GuidsTail]) ->
+ {Guid, 0, GuidsTail}
+ end, Guids2ndHalf}),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid))
end, false, Guids2ndHalf),
%% restart empty
- ok = stop_msg_store(),
- ok = start_msg_store_empty(),
+ restart_msg_store_empty(),
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids),
%% publish the first half again
@@ -1511,8 +1492,7 @@ test_msg_store() ->
msg_store_read(Guids1stHalf, MSCState9)),
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
- ok = stop_msg_store(),
- ok = start_msg_store_empty(), %% now safe to reuse guids
+ restart_msg_store_empty(), %% now safe to reuse guids
%% push a lot of msgs in... at least 100 files worth
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
PayloadSizeBits = 65536,
@@ -1545,8 +1525,7 @@ test_msg_store() ->
%% ensure empty
false = msg_store_contains(false, GuidsBig),
%% restart empty
- ok = stop_msg_store(),
- ok = start_msg_store_empty(),
+ restart_msg_store_empty(),
passed.
queue_name(Name) ->
@@ -1556,6 +1535,7 @@ test_queue() ->
queue_name(<<"test">>).
empty_test_queue() ->
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([]),
{0, _Terms, Qi1} = test_queue_init(),
_Qi2 = rabbit_queue_index:delete_and_terminate(Qi1),
@@ -1601,7 +1581,6 @@ test_queue_index() ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
MostOfASegment = trunc(SegmentSize*0.75),
- stop_msg_store(),
ok = empty_test_queue(),
SeqIdsA = lists:seq(0, MostOfASegment-1),
SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment),
@@ -1613,7 +1592,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
_Qi5 = rabbit_queue_index:terminate([], Qi4),
- ok = stop_msg_store(),
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0, as all the msgs were transient
{0, _Terms1, Qi6} = test_queue_init(),
@@ -1624,7 +1603,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
- ok = stop_msg_store(),
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
@@ -1639,12 +1618,11 @@ test_queue_index() ->
%% Everything will have gone now because #pubs == #acks
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
_Qi19 = rabbit_queue_index:terminate([], Qi18),
- ok = stop_msg_store(),
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
%% should get length back as 0 because all persistent msgs have been acked
{0, _Terms3, Qi20} = test_queue_init(),
_Qi21 = rabbit_queue_index:delete_and_terminate(Qi20),
- ok = stop_msg_store(),
ok = empty_test_queue(),
%% These next bits are just to hit the auto deletion of segment files.
@@ -1658,7 +1636,6 @@ test_queue_index() ->
Qi26 = rabbit_queue_index:flush(Qi25),
{Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26),
_Qi28 = rabbit_queue_index:delete_and_terminate(Qi27),
- ok = stop_msg_store(),
ok = empty_test_queue(),
%% b) partial pub+del, then move to new segment, then ack all in old segment
@@ -1669,7 +1646,6 @@ test_queue_index() ->
Qi33 = rabbit_queue_index:ack(SeqIdsC, Qi32),
Qi34 = rabbit_queue_index:flush(Qi33),
_Qi35 = rabbit_queue_index:delete_and_terminate(Qi34),
- ok = stop_msg_store(),
ok = empty_test_queue(),
%% c) just fill up several segments of all pubs, then +dels, then +acks
@@ -1680,7 +1656,6 @@ test_queue_index() ->
Qi39 = rabbit_queue_index:ack(SeqIdsD, Qi38),
Qi40 = rabbit_queue_index:flush(Qi39),
_Qi41 = rabbit_queue_index:delete_and_terminate(Qi40),
- ok = stop_msg_store(),
ok = empty_test_queue(),
%% d) get messages in all states to a segment, then flush, then do
@@ -1700,7 +1675,6 @@ test_queue_index() ->
{ReadE, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]),
_Qi53 = rabbit_queue_index:delete_and_terminate(Qi52),
- ok = stop_msg_store(),
ok = empty_test_queue(),
%% e) as for (d), but use terminate instead of read, which will
@@ -1710,21 +1684,21 @@ test_queue_index() ->
Qi56 = rabbit_queue_index:deliver([0,1,4], Qi55),
Qi57 = rabbit_queue_index:ack([0], Qi56),
_Qi58 = rabbit_queue_index:terminate([], Qi57),
- ok = stop_msg_store(),
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
{5, _Terms9, Qi59} = test_queue_init(),
{Qi60, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi59),
Qi61 = rabbit_queue_index:deliver([2,3,5,6], Qi60),
Qi62 = rabbit_queue_index:ack([1,2,3], Qi61),
_Qi63 = rabbit_queue_index:terminate([], Qi62),
- ok = stop_msg_store(),
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([test_queue()]),
{5, _Terms10, Qi64} = test_queue_init(),
_Qi65 = rabbit_queue_index:delete_and_terminate(Qi64),
- ok = stop_msg_store(),
+ rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start([]),
- ok = stop_msg_store(),
+
passed.
variable_queue_publish(IsPersistent, Count, VQ) ->
@@ -1753,7 +1727,6 @@ assert_prop(List, Prop, Value) ->
Value = proplists:get_value(Prop, List).
fresh_variable_queue() ->
- stop_msg_store(),
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false),
S0 = rabbit_variable_queue:status(VQ),
@@ -1890,9 +1863,8 @@ test_queue_recover() ->
receive {'DOWN', MRef, process, QPid, _Info} -> ok
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
- ok = stop_msg_store(),
- ok = supervisor:terminate_child(rabbit_sup, rabbit_amqqueue_sup),
- ok = supervisor:delete_child(rabbit_sup, rabbit_amqqueue_sup),
+ rabbit_variable_queue:stop_msg_store(),
+ rabbit_amqqueue:stop(),
ok = rabbit_amqqueue:start(),
rabbit_amqqueue:with_or_die(
QName,