diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-19 15:57:03 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-19 15:57:03 +0100 |
| commit | 7e2848234bf3ab1500db09dfcc555c44e2c273f9 (patch) | |
| tree | 8f20578ca71d7fe422e275c271cd604a113b8038 /src | |
| parent | cb7542b5afd0f7bc1bdbeadd48299a4bdf3720b9 (diff) | |
| download | rabbitmq-server-git-7e2848234bf3ab1500db09dfcc555c44e2c273f9.tar.gz | |
Some minor API changes which are pretty sensible anyway, but also make writing tests much easier. Also, tests for queue_index which hit 90% code coverage for the module. Profiling in progress to try and figure out why it's not quite as blazingly fast as I expected.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
4 files changed, 106 insertions, 28 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index a5e59ce22a..215c1bc421 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -156,7 +156,8 @@ start(normal, []) -> ok = maybe_insert_default_data(), ok = rabbit_exchange:recover(), %% TODO - this should probably use start_child somehow too - {ok, DurableQueues} = rabbit_queue_index:start_msg_store(), + DurableQueues = rabbit_amqqueue:find_durable_queues(), + ok = rabbit_queue_index:start_msg_store(DurableQueues), {ok, _RealDurableQueues} = rabbit_amqqueue:recover(DurableQueues) %% TODO - RealDurableQueues is a subset of %% DurableQueues. It may have queues removed which diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index dac174026c..3471913f65 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -34,7 +34,7 @@ -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, write_delivered/2, write_acks/2, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, - find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/0]). + find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). %%---------------------------------------------------------------------------- %% The queue disk index @@ -149,8 +149,8 @@ -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(segment_size/0 :: () -> non_neg_integer()). -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> - {non_neg_integer(), non_neg_integer()}). --spec(start_msg_store/0 :: () -> {'ok', [amqqueue()]}). + {non_neg_integer(), non_neg_integer(), qistate()}). +-spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok'). -endif. @@ -283,7 +283,7 @@ segment_size() -> ?SEGMENT_ENTRIES_COUNT. find_lowest_seq_id_seg_and_next_seq_id( - #qistate { dir = Dir, journal_ack_dict = JAckDict }) -> + State = #qistate { dir = Dir, journal_ack_dict = JAckDict }) -> SegNumsPaths = all_segment_nums_paths(Dir), %% We don't want the lowest seq_id, merely the seq_id of the start %% of the lowest segment. That seq_id may not actually exist, but @@ -295,18 +295,18 @@ find_lowest_seq_id_seg_and_next_seq_id( _ -> {SegNum1, _SegPath1} = lists:min(SegNumsPaths), reconstruct_seq_id(SegNum1, 0) end, - NextSeqId = + {NextSeqId, State1} = case SegNumsPaths of - [] -> 0; + [] -> {0, State}; _ -> {SegNum2, SegPath2} = lists:max(SegNumsPaths), + State2 = close_file_handle_for_seg(SegNum2, State), {_SDict, _AckCount, HighRelSeq} = load_segment(SegNum2, SegPath2, JAckDict), - 1 + reconstruct_seq_id(SegNum2, HighRelSeq) + {1 + reconstruct_seq_id(SegNum2, HighRelSeq), State2} end, - {LowSeqIdSeg, NextSeqId}. + {LowSeqIdSeg, NextSeqId, State1}. -start_msg_store() -> - DurableQueues = rabbit_amqqueue:find_durable_queues(), +start_msg_store(DurableQueues) -> DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name), Queue #amqqueue.name} || Queue <- DurableQueues ]), @@ -339,7 +339,7 @@ start_msg_store() -> Dir = filename:join(queues_dir(), DirName), ok = delete_queue_directory(Dir) end, TransientDirs), - {ok, DurableQueues}. + ok. %%---------------------------------------------------------------------------- %% Minor Helpers @@ -375,7 +375,8 @@ get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) -> State1 = #qistate { dir = Dir } = close_file_handle_for_seg(CurSegNum, State), {ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum), - [binary, raw, write, delayed_write, read]), + [binary, raw, read, write, + {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]), {ok, _} = file:position(Hdl, {eof, 0}), {Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}. @@ -410,8 +411,9 @@ queue_index_walker([]) -> finished; queue_index_walker([QueueName|QueueNames]) -> {TotalMsgCount, State} = init(QueueName), - {LowSeqIdSeg, _NextSeqId} = find_lowest_seq_id_seg_and_next_seq_id(State), - queue_index_walker({TotalMsgCount, LowSeqIdSeg, State, QueueNames}); + {LowSeqIdSeg, _NextSeqId, State1} = + find_lowest_seq_id_seg_and_next_seq_id(State), + queue_index_walker({TotalMsgCount, LowSeqIdSeg, State1, QueueNames}); queue_index_walker({0, _LowSeqIdSeg, State, QueueNames}) -> terminate(State), @@ -510,7 +512,8 @@ deliver_transient(SegPath, SDict) -> (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) -> {[RelSeq | AckMeAcc], DeliverMeAcc} end, {[], []}, SDict), - {ok, Hdl} = file:open(SegPath, [binary, raw, write, delayed_write, read]), + {ok, Hdl} = file:open(SegPath, [binary, raw, read, write, + {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]), {ok, _} = file:position(Hdl, {eof, 0}), ok = file:write(Hdl, [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]), @@ -523,7 +526,8 @@ deliver_transient(SegPath, SDict) -> %%---------------------------------------------------------------------------- load_segment(SegNum, SegPath, JAckDict) -> - case file:open(SegPath, [raw, binary, read_ahead, read]) of + case file:open(SegPath, [raw, binary, read, + {read_ahead, ?SEGMENT_TOTAL_SIZE}]) of {error, enoent} -> {dict:new(), 0, 0}; {ok, Hdl} -> {SDict, AckCount, HighRelSeq} = @@ -596,7 +600,8 @@ append_acks_to_segment(SegPath, AckCount, Acks) ?SEGMENT_ENTRIES_COUNT; append_acks_to_segment(SegPath, AckCount, Acks) when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT -> - {ok, Hdl} = file:open(SegPath, [raw, binary, delayed_write, write, read]), + {ok, Hdl} = file:open(SegPath, [raw, binary, read, write, + {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]), {ok, _} = file:position(Hdl, {eof, 0}), AckCount1 = lists:foldl( diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 75c53f4f88..7d5f02f7a1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -51,6 +51,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_msg_store(), + passed = test_queue_index(), passed = test_priority_queue(), passed = test_unfold(), passed = test_parsing(), @@ -980,5 +981,76 @@ test_msg_store() -> %% restart empty ok = rabbit_msg_store:stop(), {ok, _Pid3} = start_msg_store_empty(), + passed. + +queue_name(Name) -> + rabbit_misc:r(<<"/">>, queue, term_to_binary(Name)). + +test_queue() -> + queue_name(test). + +test_amqqueue(Durable) -> + #amqqueue{name = test_queue(), + durable = Durable, + auto_delete = true, + arguments = [], + pid = none}. + +empty_test_queue() -> + ok = rabbit_queue_index:start_msg_store([]), + {0, Qi1} = rabbit_queue_index:init(test_queue()), + _Qi2 = rabbit_queue_index:terminate_and_erase(Qi1), + ok. + +queue_index_publish(SeqIds, Persistent, Qi) -> + lists:foldl( + fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> + MsgId = rabbit_guid:guid(), + QiM = rabbit_queue_index:write_published(MsgId, SeqId, Persistent, + QiN), + {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} + end, {Qi, []}, SeqIds). + +test_queue_index() -> + ok = empty_test_queue(), + SeqIdsA = lists:seq(1,10000), + SeqIdsB = lists:seq(10001,20000), + {0, Qi0} = rabbit_queue_index:init(test_queue()), + {0, 0, Qi1} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), + {Qi2, _SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), + {0, 10001, Qi3} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2), + %% call terminate twice to prove it's idempotent + _Qi4 = rabbit_queue_index:terminate(rabbit_queue_index:terminate(Qi3)), + ok = rabbit_msg_store:stop(), + ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), + %% should get length back as 0, as all the msgs were transient + {0, Qi5} = rabbit_queue_index:init(test_queue()), + {false, Qi6} = rabbit_queue_index:flush_journal(Qi5), + {0, 10001, Qi7} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), + {Qi8, _SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), + {0, 20001, Qi9} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8), + _Qi10 = rabbit_queue_index:terminate(Qi9), + ok = rabbit_msg_store:stop(), + ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), + %% should get length back as 10000 + LenB = length(SeqIdsB), + {LenB, Qi11} = rabbit_queue_index:init(test_queue()), + {0, 20001, Qi12} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi11), + Qi13 = lists:foldl( + fun (SeqId, QiN) -> + rabbit_queue_index:write_delivered(SeqId, QiN) + end, Qi12, SeqIdsB), + Qi14 = rabbit_queue_index:write_acks(SeqIdsB, Qi13), + {0, 20001, Qi15} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi14), + _Qi16 = rabbit_queue_index:terminate(Qi15), ok = rabbit_msg_store:stop(), + ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), + %% should get length back as 0 because all persistent msgs have been acked + {0, _Qi17} = rabbit_queue_index:init(test_queue()), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 00f3cce52a..75ff101e5a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -116,7 +116,7 @@ init(QueueName) -> {GammaCount, IndexState} = rabbit_queue_index:init(QueueName), - {GammaSeqId, NextSeqId} = + {GammaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), Gamma = case GammaCount of 0 -> #gamma { seq_id = undefined, count = 0 }; @@ -129,7 +129,7 @@ init(QueueName) -> target_ram_msg_count = undefined, ram_msg_count = 0, queue = QueueName, - index_state = IndexState, + index_state = IndexState1, next_seq_id = NextSeqId, out_counter = 0, egress_rate = 0, @@ -303,15 +303,15 @@ delete(State) -> IndexState1 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( IndexState) of - {N, N} -> - IndexState; - {GammaSeqId, NextSeqId} -> - {_DeleteCount, IndexState2} = - delete1(NextSeqId, 0, GammaSeqId, IndexState), - IndexState2 + {N, N, IndexState2} -> + IndexState2; + {GammaSeqId, NextSeqId, IndexState2} -> + {_DeleteCount, IndexState3} = + delete1(NextSeqId, 0, GammaSeqId, IndexState2), + IndexState3 end, - IndexState3 = rabbit_queue_index:terminate_and_erase(IndexState1), - State1 #vqstate { index_state = IndexState3 }. + IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), + State1 #vqstate { index_state = IndexState4 }. %% [{Msg, AckTag}] %% We guarantee that after fetch, only persistent msgs are left on |
