summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-19 15:57:03 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-19 15:57:03 +0100
commit7e2848234bf3ab1500db09dfcc555c44e2c273f9 (patch)
tree8f20578ca71d7fe422e275c271cd604a113b8038 /src
parentcb7542b5afd0f7bc1bdbeadd48299a4bdf3720b9 (diff)
downloadrabbitmq-server-git-7e2848234bf3ab1500db09dfcc555c44e2c273f9.tar.gz
Some minor API changes which are pretty sensible anyway, but also make writing tests much easier. Also, tests for queue_index which hit 90% code coverage for the module. Profiling in progress to try and figure out why it's not quite as blazingly fast as I expected.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_queue_index.erl39
-rw-r--r--src/rabbit_tests.erl72
-rw-r--r--src/rabbit_variable_queue.erl20
4 files changed, 106 insertions, 28 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a5e59ce22a..215c1bc421 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -156,7 +156,8 @@ start(normal, []) ->
ok = maybe_insert_default_data(),
ok = rabbit_exchange:recover(),
%% TODO - this should probably use start_child somehow too
- {ok, DurableQueues} = rabbit_queue_index:start_msg_store(),
+ DurableQueues = rabbit_amqqueue:find_durable_queues(),
+ ok = rabbit_queue_index:start_msg_store(DurableQueues),
{ok, _RealDurableQueues} = rabbit_amqqueue:recover(DurableQueues)
%% TODO - RealDurableQueues is a subset of
%% DurableQueues. It may have queues removed which
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index dac174026c..3471913f65 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -34,7 +34,7 @@
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
write_delivered/2, write_acks/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
- find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/0]).
+ find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
%%----------------------------------------------------------------------------
%% The queue disk index
@@ -149,8 +149,8 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(segment_size/0 :: () -> non_neg_integer()).
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
- {non_neg_integer(), non_neg_integer()}).
--spec(start_msg_store/0 :: () -> {'ok', [amqqueue()]}).
+ {non_neg_integer(), non_neg_integer(), qistate()}).
+-spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok').
-endif.
@@ -283,7 +283,7 @@ segment_size() ->
?SEGMENT_ENTRIES_COUNT.
find_lowest_seq_id_seg_and_next_seq_id(
- #qistate { dir = Dir, journal_ack_dict = JAckDict }) ->
+ State = #qistate { dir = Dir, journal_ack_dict = JAckDict }) ->
SegNumsPaths = all_segment_nums_paths(Dir),
%% We don't want the lowest seq_id, merely the seq_id of the start
%% of the lowest segment. That seq_id may not actually exist, but
@@ -295,18 +295,18 @@ find_lowest_seq_id_seg_and_next_seq_id(
_ -> {SegNum1, _SegPath1} = lists:min(SegNumsPaths),
reconstruct_seq_id(SegNum1, 0)
end,
- NextSeqId =
+ {NextSeqId, State1} =
case SegNumsPaths of
- [] -> 0;
+ [] -> {0, State};
_ -> {SegNum2, SegPath2} = lists:max(SegNumsPaths),
+ State2 = close_file_handle_for_seg(SegNum2, State),
{_SDict, _AckCount, HighRelSeq} =
load_segment(SegNum2, SegPath2, JAckDict),
- 1 + reconstruct_seq_id(SegNum2, HighRelSeq)
+ {1 + reconstruct_seq_id(SegNum2, HighRelSeq), State2}
end,
- {LowSeqIdSeg, NextSeqId}.
+ {LowSeqIdSeg, NextSeqId, State1}.
-start_msg_store() ->
- DurableQueues = rabbit_amqqueue:find_durable_queues(),
+start_msg_store(DurableQueues) ->
DurableDict =
dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
Queue #amqqueue.name} || Queue <- DurableQueues ]),
@@ -339,7 +339,7 @@ start_msg_store() ->
Dir = filename:join(queues_dir(), DirName),
ok = delete_queue_directory(Dir)
end, TransientDirs),
- {ok, DurableQueues}.
+ ok.
%%----------------------------------------------------------------------------
%% Minor Helpers
@@ -375,7 +375,8 @@ get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) ->
State1 = #qistate { dir = Dir } =
close_file_handle_for_seg(CurSegNum, State),
{ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum),
- [binary, raw, write, delayed_write, read]),
+ [binary, raw, read, write,
+ {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]),
{ok, _} = file:position(Hdl, {eof, 0}),
{Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}.
@@ -410,8 +411,9 @@ queue_index_walker([]) ->
finished;
queue_index_walker([QueueName|QueueNames]) ->
{TotalMsgCount, State} = init(QueueName),
- {LowSeqIdSeg, _NextSeqId} = find_lowest_seq_id_seg_and_next_seq_id(State),
- queue_index_walker({TotalMsgCount, LowSeqIdSeg, State, QueueNames});
+ {LowSeqIdSeg, _NextSeqId, State1} =
+ find_lowest_seq_id_seg_and_next_seq_id(State),
+ queue_index_walker({TotalMsgCount, LowSeqIdSeg, State1, QueueNames});
queue_index_walker({0, _LowSeqIdSeg, State, QueueNames}) ->
terminate(State),
@@ -510,7 +512,8 @@ deliver_transient(SegPath, SDict) ->
(RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) ->
{[RelSeq | AckMeAcc], DeliverMeAcc}
end, {[], []}, SDict),
- {ok, Hdl} = file:open(SegPath, [binary, raw, write, delayed_write, read]),
+ {ok, Hdl} = file:open(SegPath, [binary, raw, read, write,
+ {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]),
{ok, _} = file:position(Hdl, {eof, 0}),
ok = file:write(Hdl, [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]),
@@ -523,7 +526,8 @@ deliver_transient(SegPath, SDict) ->
%%----------------------------------------------------------------------------
load_segment(SegNum, SegPath, JAckDict) ->
- case file:open(SegPath, [raw, binary, read_ahead, read]) of
+ case file:open(SegPath, [raw, binary, read,
+ {read_ahead, ?SEGMENT_TOTAL_SIZE}]) of
{error, enoent} -> {dict:new(), 0, 0};
{ok, Hdl} ->
{SDict, AckCount, HighRelSeq} =
@@ -596,7 +600,8 @@ append_acks_to_segment(SegPath, AckCount, Acks)
?SEGMENT_ENTRIES_COUNT;
append_acks_to_segment(SegPath, AckCount, Acks)
when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT ->
- {ok, Hdl} = file:open(SegPath, [raw, binary, delayed_write, write, read]),
+ {ok, Hdl} = file:open(SegPath, [raw, binary, read, write,
+ {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]),
{ok, _} = file:position(Hdl, {eof, 0}),
AckCount1 =
lists:foldl(
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 75c53f4f88..7d5f02f7a1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -51,6 +51,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_msg_store(),
+ passed = test_queue_index(),
passed = test_priority_queue(),
passed = test_unfold(),
passed = test_parsing(),
@@ -980,5 +981,76 @@ test_msg_store() ->
%% restart empty
ok = rabbit_msg_store:stop(),
{ok, _Pid3} = start_msg_store_empty(),
+ passed.
+
+queue_name(Name) ->
+ rabbit_misc:r(<<"/">>, queue, term_to_binary(Name)).
+
+test_queue() ->
+ queue_name(test).
+
+test_amqqueue(Durable) ->
+ #amqqueue{name = test_queue(),
+ durable = Durable,
+ auto_delete = true,
+ arguments = [],
+ pid = none}.
+
+empty_test_queue() ->
+ ok = rabbit_queue_index:start_msg_store([]),
+ {0, Qi1} = rabbit_queue_index:init(test_queue()),
+ _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1),
+ ok.
+
+queue_index_publish(SeqIds, Persistent, Qi) ->
+ lists:foldl(
+ fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
+ MsgId = rabbit_guid:guid(),
+ QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent,
+ QiN),
+ {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
+ end, {Qi, []}, SeqIds).
+
+test_queue_index() ->
+ ok = empty_test_queue(),
+ SeqIdsA = lists:seq(1,10000),
+ SeqIdsB = lists:seq(10001,20000),
+ {0, Qi0} = rabbit_queue_index:init(test_queue()),
+ {0, 0, Qi1} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
+ {Qi2, _SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
+ {0, 10001, Qi3} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2),
+ %% call terminate twice to prove it's idempotent
+ _Qi4 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi3)),
+ ok = rabbit_msg_store:stop(),
+ ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
+ %% should get length back as 0, as all the msgs were transient
+ {0, Qi5} = rabbit_queue_index:init(test_queue()),
+ {false, Qi6} = rabbit_queue_index:flush_journal(Qi5),
+ {0, 10001, Qi7} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
+ {Qi8, _SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
+ {0, 20001, Qi9} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8),
+ _Qi10 = rabbit_queue_index:terminate(Qi9),
+ ok = rabbit_msg_store:stop(),
+ ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
+ %% should get length back as 10000
+ LenB = length(SeqIdsB),
+ {LenB, Qi11} = rabbit_queue_index:init(test_queue()),
+ {0, 20001, Qi12} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi11),
+ Qi13 = lists:foldl(
+ fun (SeqId, QiN) ->
+ rabbit_queue_index:write_delivered(SeqId, QiN)
+ end, Qi12, SeqIdsB),
+ Qi14 = rabbit_queue_index:write_acks(SeqIdsB, Qi13),
+ {0, 20001, Qi15} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi14),
+ _Qi16 = rabbit_queue_index:terminate(Qi15),
ok = rabbit_msg_store:stop(),
+ ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
+ %% should get length back as 0 because all persistent msgs have been acked
+ {0, _Qi17} = rabbit_queue_index:init(test_queue()),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 00f3cce52a..75ff101e5a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -116,7 +116,7 @@
init(QueueName) ->
{GammaCount, IndexState} =
rabbit_queue_index:init(QueueName),
- {GammaSeqId, NextSeqId} =
+ {GammaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
Gamma = case GammaCount of
0 -> #gamma { seq_id = undefined, count = 0 };
@@ -129,7 +129,7 @@ init(QueueName) ->
target_ram_msg_count = undefined,
ram_msg_count = 0,
queue = QueueName,
- index_state = IndexState,
+ index_state = IndexState1,
next_seq_id = NextSeqId,
out_counter = 0,
egress_rate = 0,
@@ -303,15 +303,15 @@ delete(State) ->
IndexState1 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
IndexState) of
- {N, N} ->
- IndexState;
- {GammaSeqId, NextSeqId} ->
- {_DeleteCount, IndexState2} =
- delete1(NextSeqId, 0, GammaSeqId, IndexState),
- IndexState2
+ {N, N, IndexState2} ->
+ IndexState2;
+ {GammaSeqId, NextSeqId, IndexState2} ->
+ {_DeleteCount, IndexState3} =
+ delete1(NextSeqId, 0, GammaSeqId, IndexState2),
+ IndexState3
end,
- IndexState3 = rabbit_queue_index:terminate_and_erase(IndexState1),
- State1 #vqstate { index_state = IndexState3 }.
+ IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
+ State1 #vqstate { index_state = IndexState4 }.
%% [{Msg, AckTag}]
%% We guarantee that after fetch, only persistent msgs are left on