summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-19 16:48:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-19 16:48:29 +0100
commit8aed74110a5a16b6076342b890f91847c0dd695c (patch)
tree7843c8567cdf4ff842c2d778c6ac13b289bb9866 /src
parente00a2c087ef5aa7008e6fe70a1c9585c49b74b8f (diff)
downloadrabbitmq-server-git-8aed74110a5a16b6076342b890f91847c0dd695c.tar.gz
Extending queue index test slightly
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl60
1 files changed, 39 insertions, 21 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5e74142a46..683d15c96a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1011,6 +1011,15 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
end, {Qi, []}, SeqIds).
+verify_read_with_published(_Delivered, _Persistent, [], _) ->
+ ok;
+verify_read_with_published(Delivered, Persistent,
+ [{MsgId, SeqId, Persistent, Delivered}|Read],
+ [{SeqId, MsgId}|Published]) ->
+ verify_read_with_published(Delivered, Persistent, Read, Published);
+verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
+ ko.
+
test_queue_index() ->
ok = empty_test_queue(),
SeqIdsA = lists:seq(1,10000),
@@ -1018,41 +1027,50 @@ test_queue_index() ->
{0, Qi0} = rabbit_queue_index:init(test_queue()),
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
- {Qi2, _SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
+ {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, 10001, Qi3} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2),
+ {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3),
+ ok = verify_read_with_published(false, false, ReadA,
+ lists:reverse(SeqIdsMsgIdsA)),
%% call terminate twice to prove it's idempotent
- _Qi4 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi3)),
+ _Qi5 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi4)),
ok = rabbit_msg_store:stop(),
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 0, as all the msgs were transient
- {0, Qi5} = rabbit_queue_index:init(test_queue()),
- {false, Qi6} = rabbit_queue_index:flush_journal(Qi5),
- {0, 10001, Qi7} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
- {Qi8, _SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
- {0, 20001, Qi9} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8),
- _Qi10 = rabbit_queue_index:terminate(Qi9),
+ {0, Qi6} = rabbit_queue_index:init(test_queue()),
+ {false, Qi7} = rabbit_queue_index:flush_journal(Qi6),
+ {0, 10001, Qi8} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi7),
+ {Qi9, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi8),
+ {0, 20001, Qi10} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi9),
+ {ReadB, Qi11} = rabbit_queue_index:read_segment_entries(0, Qi10),
+ ok = verify_read_with_published(false, true, ReadB,
+ lists:reverse(SeqIdsMsgIdsB)),
+ _Qi12 = rabbit_queue_index:terminate(Qi11),
ok = rabbit_msg_store:stop(),
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, Qi11} = rabbit_queue_index:init(test_queue()),
- {0, 20001, Qi12} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi11),
- Qi13 = lists:foldl(
+ {LenB, Qi13} = rabbit_queue_index:init(test_queue()),
+ {0, 20001, Qi14} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi13),
+ Qi15 = lists:foldl(
fun (SeqId, QiN) ->
rabbit_queue_index:write_delivered(SeqId, QiN)
- end, Qi12, SeqIdsB),
- Qi14 = rabbit_queue_index:write_acks(SeqIdsB, Qi13),
- {0, 20001, Qi15} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi14),
- _Qi16 = rabbit_queue_index:terminate(Qi15),
+ end, Qi14, SeqIdsB),
+ {ReadC, Qi16} = rabbit_queue_index:read_segment_entries(0, Qi15),
+ ok = verify_read_with_published(true, true, ReadC,
+ lists:reverse(SeqIdsMsgIdsB)),
+ Qi17 = rabbit_queue_index:write_acks(SeqIdsB, Qi16),
+ {0, 20001, Qi18} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
+ _Qi19 = rabbit_queue_index:terminate(Qi18),
ok = rabbit_msg_store:stop(),
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 0 because all persistent msgs have been acked
- {0, Qi17} = rabbit_queue_index:init(test_queue()),
- _Qi18 = rabbit_queue_index:terminate_and_erase(Qi17),
+ {0, Qi20} = rabbit_queue_index:init(test_queue()),
+ _Qi21 = rabbit_queue_index:terminate_and_erase(Qi20),
ok = rabbit_msg_store:stop(),
passed.