diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-20 21:27:36 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-20 21:27:36 +0100 |
| commit | 2e7912c51fed49017f5521d212b01a8dcdfc2f38 (patch) | |
| tree | c1581795e03246e845d01e1fdcc6a8c22539ab9a | |
| parent | d6c6a98d51b2b7df136cf529d782d9eaf05b26cf (diff) | |
| download | rabbitmq-server-git-2e7912c51fed49017f5521d212b01a8dcdfc2f38.tar.gz | |
fix tests
| -rw-r--r-- | src/rabbit_tests.erl | 143 |
1 files changed, 85 insertions, 58 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1b47cdb71c..29f175df2c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,8 +41,8 @@ -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -1430,17 +1430,17 @@ restart_msg_store_empty() -> guid_bin(X) -> erlang:md5(term_to_binary(X)). -msg_store_contains(Atom, Guids) -> +msg_store_contains(Atom, Guids, MSCState) -> Atom = lists:foldl( fun (Guid, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, + rabbit_msg_store:contains(Guid, MSCState) end, Atom, Guids). -msg_store_sync(Guids) -> +msg_store_sync(Guids, MSCState) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids, - fun () -> Self ! {sync, Ref} end), + ok = rabbit_msg_store:sync(Guids, fun () -> Self ! {sync, Ref} end, + MSCState), receive {sync, Ref} -> ok after @@ -1452,55 +1452,64 @@ msg_store_sync(Guids) -> msg_store_read(Guids, MSCState) -> lists:foldl(fun (Guid, MSCStateM) -> {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( - ?PERSISTENT_MSG_STORE, Guid, MSCStateM), MSCStateN end, MSCState, Guids). msg_store_write(Guids, MSCState) -> lists:foldl(fun (Guid, {ok, MSCStateN}) -> - rabbit_msg_store:write(?PERSISTENT_MSG_STORE, - Guid, Guid, MSCStateN) + rabbit_msg_store:write(Guid, Guid, MSCStateN) end, {ok, MSCState}, Guids). -msg_store_remove(Guids) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids). +msg_store_remove(Guids, MSCState) -> + rabbit_msg_store:remove(Guids, MSCState). + +msg_store_remove(MsgStore, Ref, Guids) -> + with_msg_store_client(MsgStore, Ref, + fun (MSCStateM) -> + ok = msg_store_remove(Guids, MSCStateM), + MSCStateM + end). + +with_msg_store_client(MsgStore, Ref, Fun) -> + rabbit_msg_store:client_terminate( + Fun(rabbit_msg_store:client_init(MsgStore, Ref))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( - lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, + rabbit_msg_store:client_init(MsgStore, Ref), L)). test_msg_store() -> restart_msg_store_empty(), Self = self(), Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), - %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + %% check we don't contain any of the msgs we're about to publish + false = msg_store_contains(false, Guids, MSCState), %% publish the first half {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState1), %% publish the second half {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState1), %% check they're all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState1), %% publish the latter half twice so we hit the caching and ref count code {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2), %% check they're still all in there - true = msg_store_contains(true, Guids), + true = msg_store_contains(true, Guids, MSCState2), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( fun (Guid, ok) -> rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, - [Guid], fun () -> Self ! {sync, Guid} end) + [Guid], fun () -> Self ! {sync, Guid} end, + MSCState3) end, ok, Guids2ndHalf), lists:foldl( fun(Guid, ok) -> @@ -1515,24 +1524,24 @@ test_msg_store() -> end, ok, Guids2ndHalf), %% it's very likely we're not dirty here, so the 1st half sync %% should hit a different code path - ok = msg_store_sync(Guids1stHalf), + ok = msg_store_sync(Guids1stHalf, MSCState3), %% read them all MSCState4 = msg_store_read(Guids, MSCState3), %% read them all again - this will hit the cache, not disk MSCState5 = msg_store_read(Guids, MSCState4), %% remove them all - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids), + ok = rabbit_msg_store:remove(Guids, MSCState5), %% check first half doesn't exist - false = msg_store_contains(false, Guids1stHalf), + false = msg_store_contains(false, Guids1stHalf, MSCState5), %% check second half does exist - true = msg_store_contains(true, Guids2ndHalf), + true = msg_store_contains(true, Guids2ndHalf, MSCState5), %% read the second half again MSCState6 = msg_store_read(Guids2ndHalf, MSCState5), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), + ok = rabbit_msg_store:release(Guids2ndHalf, MSCState6), %% read the second half again, just for fun (aka code coverage) MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), + ok = rabbit_msg_store:client_terminate(MSCState7), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1543,22 +1552,26 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> - not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)) + not(Bool = rabbit_msg_store:contains(Guid, MSCState8)) end, false, Guids2ndHalf), + ok = rabbit_msg_store:client_terminate(MSCState8), %% restart empty restart_msg_store_empty(), + MSCState9 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs - false = msg_store_contains(false, Guids), + false = msg_store_contains(false, Guids, MSCState9), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), - {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), + {ok, MSCState10} = msg_store_write(Guids1stHalf, MSCState9), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), + msg_store_read(Guids1stHalf, MSCState10)), + MSCState11 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + ok = rabbit_msg_store:remove(Guids1stHalf, MSCState11), + ok = rabbit_msg_store:client_terminate(MSCState11), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids %% push a lot of msgs in... at least 100 files worth @@ -1569,29 +1582,37 @@ test_msg_store() -> Payload = << 0:PayloadSizeBits >>, ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> + fun (Guid, MSCStateM) -> {ok, MSCStateN} = rabbit_msg_store:write( - MsgStore, Guid, Payload, MSCStateM), + Guid, Payload, MSCStateM), MSCStateN end, GuidsBig), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (Guid, MsgStore, MSCStateM) -> + fun (Guid, MSCStateM) -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( - MsgStore, Guid, MSCStateM), + Guid, MSCStateM), MSCStateN end, GuidsBig), %% .., then 3s by 1... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), %% .., then remove 3s by 3, from the young end first. This hits %% GC... - ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, + [guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), %% ensure empty - false = msg_store_contains(false, GuidsBig), + ok = with_msg_store_client( + ?PERSISTENT_MSG_STORE, Ref, + fun (MSCStateM) -> + false = msg_store_contains(false, GuidsBig, MSCStateM), + MSCStateM + end), %% restart empty restart_msg_store_empty(), passed. @@ -1603,11 +1624,18 @@ test_queue() -> queue_name(<<"test">>). init_test_queue() -> - rabbit_queue_index:init( - test_queue(), true, false, - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end). + TestQueue = test_queue(), + Terms = rabbit_queue_index:shutdown_terms(TestQueue), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + Res = rabbit_queue_index:recover( + TestQueue, Terms, false, + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), + Res. restart_test_queue(Qi) -> _ = rabbit_queue_index:terminate([], Qi), @@ -1618,13 +1646,13 @@ restart_test_queue(Qi) -> empty_test_queue() -> ok = rabbit_variable_queue:stop(), ok = rabbit_variable_queue:start([]), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), _ = rabbit_queue_index:delete_and_terminate(Qi), ok. with_empty_test_queue(Fun) -> ok = empty_test_queue(), - {0, _Terms, Qi} = init_test_queue(), + {0, Qi} = init_test_queue(), rabbit_queue_index:delete_and_terminate(Fun(Qi)). queue_index_publish(SeqIds, Persistent, Qi) -> @@ -1639,12 +1667,11 @@ queue_index_publish(SeqIds, Persistent, Qi) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( Guid, SeqId, Persistent, QiN), - {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, - Guid, MSCStateN), + {ok, MSCStateM} = rabbit_msg_store:write(Guid, Guid, + MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), - ok = rabbit_msg_store:client_delete_and_terminate( - MSCStateEnd, MsgStore, Ref), + ok = rabbit_msg_store:client_delete_and_terminate(MSCStateEnd), {A, B}. verify_read_with_published(_Delivered, _Persistent, [], _) -> @@ -1674,7 +1701,7 @@ test_queue_index() -> ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), %% should get length back as 0, as all the msgs were transient - {0, _Terms1, Qi6} = restart_test_queue(Qi4), + {0, Qi6} = restart_test_queue(Qi4), {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), @@ -1683,7 +1710,7 @@ test_queue_index() -> lists:reverse(SeqIdsGuidsB)), %% should get length back as MostOfASegment LenB = length(SeqIdsB), - {LenB, _Terms2, Qi12} = restart_test_queue(Qi10), + {LenB, Qi12} = restart_test_queue(Qi10), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), @@ -1695,7 +1722,7 @@ test_queue_index() -> {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17), %% should get length back as 0 because all persistent %% msgs have been acked - {0, _Terms3, Qi19} = restart_test_queue(Qi18), + {0, Qi19} = restart_test_queue(Qi18), Qi19 end), @@ -1767,11 +1794,11 @@ test_queue_index() -> true, Qi0), Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1), Qi3 = rabbit_queue_index:ack([0], Qi2), - {5, _Terms9, Qi4} = restart_test_queue(Qi3), + {5, Qi4} = restart_test_queue(Qi3), {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4), Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5), Qi7 = rabbit_queue_index:ack([1,2,3], Qi6), - {5, _Terms10, Qi8} = restart_test_queue(Qi7), + {5, Qi8} = restart_test_queue(Qi7), Qi8 end), |
