diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-26 15:29:56 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-26 15:29:56 +0000 |
| commit | 896dee8c4acffe20e2e3c4dda3358678ca02e5ac (patch) | |
| tree | 533701b62d894aba49078a94a9c5040a8ed0c7bb | |
| parent | 595098e4b5c04b5031facfec20c9c92f337191ec (diff) | |
| download | rabbitmq-server-git-896dee8c4acffe20e2e3c4dda3358678ca02e5ac.tar.gz | |
Mainly a load of cosmetics, and some minor API changes, but also manage to hook in getting the queue to flush the journal out if the queue is idle and has no work to do. This takes advantage of the ability to incrementally flush out the ack journal.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 10 |
4 files changed, 69 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cd94c6e450..180a9f8a07 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -153,28 +153,32 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State = #q { variable_queue_state = VQS }) -> +next_state(State = #q{variable_queue_state = VQS}) -> next_state1(State, rabbit_variable_queue:needs_sync(VQS)). -next_state1(State = #q { sync_timer_ref = undefined }, true) -> +next_state1(State = #q{sync_timer_ref = undefined}, true) -> {start_sync_timer(State), 0}; next_state1(State, true) -> {State, 0}; -next_state1(State = #q { sync_timer_ref = undefined }, false) -> - {State, hibernate}; +next_state1(State = #q{sync_timer_ref = undefined, + variable_queue_state = VQS}, false) -> + {State, case rabbit_variable_queue:can_flush_journal(VQS) of + true -> 0; + false -> hibernate + end}; next_state1(State, false) -> {stop_sync_timer(State), 0}. -start_sync_timer(State = #q { sync_timer_ref = undefined }) -> +start_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue, tx_commit_vq_callback, [self()]), - State #q { sync_timer_ref = TRef }. + State#q{sync_timer_ref = TRef}. -stop_sync_timer(State = #q { sync_timer_ref = TRef }) -> +stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), - State #q { sync_timer_ref = undefined }. + State#q{sync_timer_ref = undefined}. -assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) -> +assert_invariant(#q{active_consumers = AC, variable_queue_state = VQS}) -> true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)). lookup_ch(ChPid) -> @@ -868,6 +872,13 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {stop, NewState} -> {stop, normal, NewState} end; +handle_info(timeout, State = #q{variable_queue_state = VQS, + sync_timer_ref = undefined}) -> + %% if sync_timer_ref is undefined then we must have set the + %% timeout to zero because we thought we could flush the journal + noreply(State#q{variable_queue_state = + rabbit_variable_queue:flush_journal(VQS)}); + handle_info(timeout, State = #q{variable_queue_state = VQS}) -> noreply( run_message_queue( diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e3057f9cb4..39116b0dd2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/3, + write_delivered/2, write_acks/2, can_flush_journal/1, flush_journal/1, sync_seq_ids/3, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). @@ -136,7 +136,8 @@ -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). --spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}). +-spec(flush_journal/1 :: (qistate()) -> qistate()). +-spec(can_flush_journal/1 :: (qistate()) -> boolean()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> {( [{msg_id(), seq_id(), boolean(), boolean()}] | 'not_found'), qistate()}). @@ -211,12 +212,6 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict, false -> State2 end. -full_flush_journal(State) -> - case flush_journal(State) of - {true, State1} -> full_flush_journal(State1); - {false, State1} -> State1 - end. - sync_seq_ids(SeqIds, SyncAckJournal, State) -> State1 = case SyncAckJournal of true -> {Hdl, State2} = get_journal_handle(State), @@ -237,8 +232,13 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) -> StateM end, State1, SegNumsSet). +can_flush_journal(#qistate { journal_ack_count = 0 }) -> + false; +can_flush_journal(_) -> + true. + flush_journal(State = #qistate { journal_ack_count = 0 }) -> - {false, State}; + State; flush_journal(State = #qistate { journal_ack_dict = JAckDict, journal_ack_count = JAckCount }) -> [SegNum|_] = dict:fetch_keys(JAckDict), @@ -253,11 +253,11 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict, ok = file_handle_cache:position(Hdl, bof), ok = file_handle_cache:truncate(Hdl), ok = file_handle_cache:sync(Hdl), - {false, State3}; + State3; JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> flush_journal(State2); true -> - {true, State2} + State2 end. read_segment_entries(InitSeqId, State) -> @@ -348,6 +348,13 @@ start_msg_store(DurableQueues) -> %% Minor Helpers %%---------------------------------------------------------------------------- +full_flush_journal(State) -> + case can_flush_journal(State) of + true -> State1 = flush_journal(State), + full_flush_journal(State1); + false -> State + end. + queue_name_to_dir_name(Name = #resource { kind = queue }) -> Bin = term_to_binary(Name), Size = 8*size(Bin), @@ -421,6 +428,11 @@ add_ack_to_ack_dict(SeqId, ADict) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). +all_segment_nums(Dir) -> + [list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) + || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. + %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function @@ -454,11 +466,6 @@ queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries], %% Startup Functions %%---------------------------------------------------------------------------- -all_segment_nums(Dir) -> - [list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) - || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. - find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) -> SegNums = all_segment_nums(Dir), {TotalMsgCount, State1} = diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8114247604..ebd8432a20 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1040,31 +1040,33 @@ test_queue_index() -> ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), %% should get length back as 0, as all the msgs were transient {0, Qi6} = rabbit_queue_index:init(test_queue()), - {false, Qi7} = rabbit_queue_index:flush_journal(Qi6), - {0, 10001, Qi8} = - rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi7), - {Qi9, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi8), - {0, 20001, Qi10} = - rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi9), - {ReadB, Qi11} = rabbit_queue_index:read_segment_entries(0, Qi10), + false = rabbit_queue_index:can_flush_journal(Qi6), + {0, 10001, Qi7} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), + {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), + {0, 20001, Qi9} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8), + {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), ok = verify_read_with_published(false, true, ReadB, lists:reverse(SeqIdsMsgIdsB)), - _Qi12 = rabbit_queue_index:terminate(Qi11), + _Qi11 = rabbit_queue_index:terminate(Qi10), ok = rabbit_msg_store:stop(), ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), %% should get length back as 10000 LenB = length(SeqIdsB), - {LenB, Qi13} = rabbit_queue_index:init(test_queue()), - {0, 20001, Qi14} = - rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi13), - Qi15 = lists:foldl( + {LenB, Qi12} = rabbit_queue_index:init(test_queue()), + {0, 20001, Qi13} = + rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12), + Qi14 = lists:foldl( fun (SeqId, QiN) -> rabbit_queue_index:write_delivered(SeqId, QiN) - end, Qi14, SeqIdsB), - {ReadC, Qi16} = rabbit_queue_index:read_segment_entries(0, Qi15), + end, Qi13, SeqIdsB), + {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsMsgIdsB)), - Qi17 = rabbit_queue_index:write_acks(SeqIdsB, Qi16), + Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), + true = rabbit_queue_index:can_flush_journal(Qi16), + Qi17 = rabbit_queue_index:flush_journal(Qi16), {0, 20001, Qi18} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17), _Qi19 = rabbit_queue_index:terminate(Qi18), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 33e09c113d..da56487e01 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -35,7 +35,8 @@ set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, - tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1]). + tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, + can_flush_journal/1, flush_journal/1]). %%---------------------------------------------------------------------------- @@ -412,6 +413,13 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) -> needs_sync(_) -> true. +can_flush_journal(#vqstate { index_state = IndexState }) -> + rabbit_queue_index:can_flush_journal(IndexState). + +flush_journal(State = #vqstate { index_state = IndexState }) -> + State #vqstate { index_state = + rabbit_queue_index:flush_journal(IndexState) }. + %%---------------------------------------------------------------------------- persistent_msg_ids(Pubs) -> |
