summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-20 21:27:36 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-20 21:27:36 +0100
commit2e7912c51fed49017f5521d212b01a8dcdfc2f38 (patch)
treec1581795e03246e845d01e1fdcc6a8c22539ab9a
parentd6c6a98d51b2b7df136cf529d782d9eaf05b26cf (diff)
downloadrabbitmq-server-git-2e7912c51fed49017f5521d212b01a8dcdfc2f38.tar.gz
fix tests
-rw-r--r--src/rabbit_tests.erl143
1 files changed, 85 insertions, 58 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1b47cdb71c..29f175df2c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,8 +41,8 @@
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
@@ -1430,17 +1430,17 @@ restart_msg_store_empty() ->
guid_bin(X) ->
erlang:md5(term_to_binary(X)).
-msg_store_contains(Atom, Guids) ->
+msg_store_contains(Atom, Guids, MSCState) ->
Atom = lists:foldl(
fun (Guid, Atom1) when Atom1 =:= Atom ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end,
+ rabbit_msg_store:contains(Guid, MSCState) end,
Atom, Guids).
-msg_store_sync(Guids) ->
+msg_store_sync(Guids, MSCState) ->
Ref = make_ref(),
Self = self(),
- ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids,
- fun () -> Self ! {sync, Ref} end),
+ ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end,
+ MSCState),
receive
{sync, Ref} -> ok
after
@@ -1452,55 +1452,64 @@ msg_store_sync(Guids) ->
msg_store_read(Guids, MSCState) ->
lists:foldl(fun (Guid, MSCStateM) ->
{{ok, Guid}, MSCStateN} = rabbit_msg_store:read(
- ?PERSISTENT_MSG_STORE,
Guid, MSCStateM),
MSCStateN
end, MSCState, Guids).
msg_store_write(Guids, MSCState) ->
lists:foldl(fun (Guid, {ok, MSCStateN}) ->
- rabbit_msg_store:write(?PERSISTENT_MSG_STORE,
- Guid, Guid, MSCStateN)
+ rabbit_msg_store:write(Guid, Guid, MSCStateN)
end, {ok, MSCState}, Guids).
-msg_store_remove(Guids) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids).
+msg_store_remove(Guids, MSCState) ->
+ rabbit_msg_store:remove(Guids, MSCState).
+
+msg_store_remove(MsgStore, Ref, Guids) ->
+ with_msg_store_client(MsgStore, Ref,
+ fun (MSCStateM) ->
+ ok = msg_store_remove(Guids, MSCStateM),
+ MSCStateM
+ end).
+
+with_msg_store_client(MsgStore, Ref, Fun) ->
+ rabbit_msg_store:client_terminate(
+ Fun(rabbit_msg_store:client_init(MsgStore, Ref))).
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
- lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
+ lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
+ rabbit_msg_store:client_init(MsgStore, Ref), L)).
test_msg_store() ->
restart_msg_store_empty(),
Self = self(),
Guids = [guid_bin(M) || M <- lists:seq(1,100)],
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
- %% check we don't contain any of the msgs we're about to publish
- false = msg_store_contains(false, Guids),
Ref = rabbit_guid:guid(),
MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ %% check we don't contain any of the msgs we're about to publish
+ false = msg_store_contains(false, Guids, MSCState),
%% publish the first half
{ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState),
%% sync on the first half
- ok = msg_store_sync(Guids1stHalf),
+ ok = msg_store_sync(Guids1stHalf, MSCState1),
%% publish the second half
{ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1),
%% sync on the first half again - the msg_store will be dirty, but
%% we won't need the fsync
- ok = msg_store_sync(Guids1stHalf),
+ ok = msg_store_sync(Guids1stHalf, MSCState1),
%% check they're all in there
- true = msg_store_contains(true, Guids),
+ true = msg_store_contains(true, Guids, MSCState1),
%% publish the latter half twice so we hit the caching and ref count code
{ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2),
%% check they're still all in there
- true = msg_store_contains(true, Guids),
+ true = msg_store_contains(true, Guids, MSCState2),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
ok = lists:foldl(
fun (Guid, ok) -> rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE,
- [Guid], fun () -> Self ! {sync, Guid} end)
+ [Guid], fun () -> Self ! {sync, Guid} end,
+ MSCState3)
end, ok, Guids2ndHalf),
lists:foldl(
fun(Guid, ok) ->
@@ -1515,24 +1524,24 @@ test_msg_store() ->
end, ok, Guids2ndHalf),
%% it's very likely we're not dirty here, so the 1st half sync
%% should hit a different code path
- ok = msg_store_sync(Guids1stHalf),
+ ok = msg_store_sync(Guids1stHalf, MSCState3),
%% read them all
MSCState4 = msg_store_read(Guids, MSCState3),
%% read them all again - this will hit the cache, not disk
MSCState5 = msg_store_read(Guids, MSCState4),
%% remove them all
- ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids),
+ ok = rabbit_msg_store:remove(Guids, MSCState5),
%% check first half doesn't exist
- false = msg_store_contains(false, Guids1stHalf),
+ false = msg_store_contains(false, Guids1stHalf, MSCState5),
%% check second half does exist
- true = msg_store_contains(true, Guids2ndHalf),
+ true = msg_store_contains(true, Guids2ndHalf, MSCState5),
%% read the second half again
MSCState6 = msg_store_read(Guids2ndHalf, MSCState5),
%% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
+ ok = rabbit_msg_store:release(Guids2ndHalf, MSCState6),
%% read the second half again, just for fun (aka code coverage)
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
+ ok = rabbit_msg_store:client_terminate(MSCState7),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1543,22 +1552,26 @@ test_msg_store() ->
([Guid|GuidsTail]) ->
{Guid, 0, GuidsTail}
end, Guids2ndHalf}),
+ MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
- not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid))
+ not(Bool = rabbit_msg_store:contains(Guid, MSCState8))
end, false, Guids2ndHalf),
+ ok = rabbit_msg_store:client_terminate(MSCState8),
%% restart empty
restart_msg_store_empty(),
+ MSCState9 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs
- false = msg_store_contains(false, Guids),
+ false = msg_store_contains(false, Guids, MSCState9),
%% publish the first half again
- MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
- {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
+ {ok, MSCState10} = msg_store_write(Guids1stHalf, MSCState9),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
- ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
+ msg_store_read(Guids1stHalf, MSCState10)),
+ MSCState11 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ ok = rabbit_msg_store:remove(Guids1stHalf, MSCState11),
+ ok = rabbit_msg_store:client_terminate(MSCState11),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
%% push a lot of msgs in... at least 100 files worth
@@ -1569,29 +1582,37 @@ test_msg_store() ->
Payload = << 0:PayloadSizeBits >>,
ok = foreach_with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
- fun (Guid, MsgStore, MSCStateM) ->
+ fun (Guid, MSCStateM) ->
{ok, MSCStateN} = rabbit_msg_store:write(
- MsgStore, Guid, Payload, MSCStateM),
+ Guid, Payload, MSCStateM),
MSCStateN
end, GuidsBig),
%% now read them to ensure we hit the fast client-side reading
ok = foreach_with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
- fun (Guid, MsgStore, MSCStateM) ->
+ fun (Guid, MSCStateM) ->
{{ok, Payload}, MSCStateN} = rabbit_msg_store:read(
- MsgStore, Guid, MSCStateM),
+ Guid, MSCStateM),
MSCStateN
end, GuidsBig),
%% .., then 3s by 1...
- ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
+ [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
%% .., then remove 3s by 2, from the young end first. This hits
%% GC (under 50% good data left, but no empty files. Must GC).
- ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
+ [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
%% .., then remove 3s by 3, from the young end first. This hits
%% GC...
- ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
+ [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
%% ensure empty
- false = msg_store_contains(false, GuidsBig),
+ ok = with_msg_store_client(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (MSCStateM) ->
+ false = msg_store_contains(false, GuidsBig, MSCStateM),
+ MSCStateM
+ end),
%% restart empty
restart_msg_store_empty(),
passed.
@@ -1603,11 +1624,18 @@ test_queue() ->
queue_name(<<"test">>).
init_test_queue() ->
- rabbit_queue_index:init(
- test_queue(), true, false,
- fun (Guid) ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
- end).
+ TestQueue = test_queue(),
+ Terms = rabbit_queue_index:shutdown_terms(TestQueue),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
+ PRef),
+ Res = rabbit_queue_index:recover(
+ TestQueue, Terms, false,
+ fun (Guid) ->
+ rabbit_msg_store:contains(Guid, PersistentClient)
+ end),
+ ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
+ Res.
restart_test_queue(Qi) ->
_ = rabbit_queue_index:terminate([], Qi),
@@ -1618,13 +1646,13 @@ restart_test_queue(Qi) ->
empty_test_queue() ->
ok = rabbit_variable_queue:stop(),
ok = rabbit_variable_queue:start([]),
- {0, _Terms, Qi} = init_test_queue(),
+ {0, Qi} = init_test_queue(),
_ = rabbit_queue_index:delete_and_terminate(Qi),
ok.
with_empty_test_queue(Fun) ->
ok = empty_test_queue(),
- {0, _Terms, Qi} = init_test_queue(),
+ {0, Qi} = init_test_queue(),
rabbit_queue_index:delete_and_terminate(Fun(Qi)).
queue_index_publish(SeqIds, Persistent, Qi) ->
@@ -1639,12 +1667,11 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:publish(
Guid, SeqId, Persistent, QiN),
- {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
- Guid, MSCStateN),
+ {ok, MSCStateM} = rabbit_msg_store:write(Guid, Guid,
+ MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
- ok = rabbit_msg_store:client_delete_and_terminate(
- MSCStateEnd, MsgStore, Ref),
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCStateEnd),
{A, B}.
verify_read_with_published(_Delivered, _Persistent, [], _) ->
@@ -1674,7 +1701,7 @@ test_queue_index() ->
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
%% should get length back as 0, as all the msgs were transient
- {0, _Terms1, Qi6} = restart_test_queue(Qi4),
+ {0, Qi6} = restart_test_queue(Qi4),
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
@@ -1683,7 +1710,7 @@ test_queue_index() ->
lists:reverse(SeqIdsGuidsB)),
%% should get length back as MostOfASegment
LenB = length(SeqIdsB),
- {LenB, _Terms2, Qi12} = restart_test_queue(Qi10),
+ {LenB, Qi12} = restart_test_queue(Qi10),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
@@ -1695,7 +1722,7 @@ test_queue_index() ->
{0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
%% should get length back as 0 because all persistent
%% msgs have been acked
- {0, _Terms3, Qi19} = restart_test_queue(Qi18),
+ {0, Qi19} = restart_test_queue(Qi18),
Qi19
end),
@@ -1767,11 +1794,11 @@ test_queue_index() ->
true, Qi0),
Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
Qi3 = rabbit_queue_index:ack([0], Qi2),
- {5, _Terms9, Qi4} = restart_test_queue(Qi3),
+ {5, Qi4} = restart_test_queue(Qi3),
{Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4),
Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
- {5, _Terms10, Qi8} = restart_test_queue(Qi7),
+ {5, Qi8} = restart_test_queue(Qi7),
Qi8
end),