diff options
| -rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 100 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 13 |
5 files changed, 60 insertions, 79 deletions
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 34904850be..43488c9274 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -109,7 +109,7 @@ exec erl \ -os_mon start_cpu_sup true \ -os_mon start_disksup false \ -os_mon start_memsup false \ - -os_mon vm_memory_high_watermark 0.4 \ + -os_mon vm_memory_high_watermark 0.08 \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ ${RABBITMQ_CLUSTER_CONFIG_OPTION} \ ${RABBITMQ_SERVER_START_ARGS} \ diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a79abe8cdb..b0c1ccacb8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -923,5 +923,6 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) -> VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS), + VQS2 = rabbit_variable_queue:full_flush_journal(VQS1), {hibernate, stop_egress_rate_timer( - State#q{ variable_queue_state = VQS1 })}. + State#q{ variable_queue_state = VQS2 })}. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 54d681c5d3..bd8996764e 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,10 +32,9 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/3, can_flush_journal/1, - flush_journal/1, read_segment_entries/2, next_segment_boundary/1, - segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, - start_msg_store/1]). + write_delivered/2, write_acks/2, sync_seq_ids/3, full_flush_journal/1, + read_segment_entries/2, next_segment_boundary/1, segment_size/0, + find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). %%---------------------------------------------------------------------------- %% The queue disk index @@ -150,8 +149,7 @@ -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()). --spec(can_flush_journal/1 :: (qistate()) -> boolean()). --spec(flush_journal/1 :: (qistate()) -> qistate()). +-spec(full_flush_journal/1 :: (qistate()) -> qistate()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). @@ -228,40 +226,10 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) -> StateM end, State1, SegNumsSet). -can_flush_journal(#qistate { journal_count = 0 }) -> - false; -can_flush_journal(_) -> - true. - -flush_journal(State = #qistate { journal_count = 0 }) -> +full_flush_journal(State = #qistate { journal_count = 0 }) -> State; -flush_journal(State = #qistate { journal_ack_dict = JAckDict, - journal_del_dict = JDelDict, - journal_count = JCount }) -> - SegNum = case dict:fetch_keys(JAckDict) of - [] -> hd(dict:fetch_keys(JDelDict)); - [N|_] -> N - end, - Dels = seg_entries_from_dict(SegNum, JDelDict), - Acks = seg_entries_from_dict(SegNum, JAckDict), - State1 = append_dels_to_segment(SegNum, Dels, State), - State2 = append_acks_to_segment(SegNum, Acks, State1), - JCount1 = JCount - length(Dels) - length(Acks), - State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), - journal_ack_dict = dict:erase(SegNum, JAckDict), - journal_count = JCount1 }, - if - JCount1 == 0 -> - {Hdl, State4} = get_journal_handle(State3), - {ok, 0} = file_handle_cache:position(Hdl, bof), - ok = file_handle_cache:truncate(Hdl), - ok = file_handle_cache:sync(Hdl), - State4; - JCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> - flush_journal(State3); - true -> - State3 - end. +full_flush_journal(State) -> + full_flush_journal(flush_journal(State)). read_segment_entries(InitSeqId, State) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), @@ -349,6 +317,47 @@ start_msg_store(DurableQueues) -> %%---------------------------------------------------------------------------- +%% Journal Flushing +%%---------------------------------------------------------------------------- + +flush_journal(State = #qistate { journal_count = 0 }) -> + State; +flush_journal(State = #qistate { journal_ack_dict = JAckDict, + journal_del_dict = JDelDict, + journal_count = JCount }) -> + SegNum = case dict:fetch_keys(JAckDict) of + [] -> hd(dict:fetch_keys(JDelDict)); + [N|_] -> N + end, + Dels = seg_entries_from_dict(SegNum, JDelDict), + Acks = seg_entries_from_dict(SegNum, JAckDict), + State1 = append_dels_to_segment(SegNum, Dels, State), + State2 = append_acks_to_segment(SegNum, Acks, State1), + JCount1 = JCount - length(Dels) - length(Acks), + State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), + journal_ack_dict = dict:erase(SegNum, JAckDict), + journal_count = JCount1 }, + if + JCount1 == 0 -> + {Hdl, State4} = get_journal_handle(State3), + {ok, 0} = file_handle_cache:position(Hdl, bof), + ok = file_handle_cache:truncate(Hdl), + ok = file_handle_cache:sync(Hdl), + State4; + JCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> + flush_journal(State3); + true -> + State3 + end. + +maybe_full_flush(State = #qistate { journal_count = JCount }) -> + case JCount > ?MAX_ACK_JOURNAL_ENTRY_COUNT of + true -> full_flush_journal(State); + false -> State + end. + + +%%---------------------------------------------------------------------------- %% Minor Helpers %%---------------------------------------------------------------------------- @@ -363,19 +372,6 @@ write_to_journal(BinList, SeqIds, Dict, end, {Dict, JCount}, SeqIds), {Dict1, State1 #qistate { journal_count = JCount1 }}. -maybe_full_flush(State = #qistate { journal_count = JCount }) -> - case JCount > ?MAX_ACK_JOURNAL_ENTRY_COUNT of - true -> full_flush_journal(State); - false -> State - end. - -full_flush_journal(State) -> - case can_flush_journal(State) of - true -> State1 = flush_journal(State), - full_flush_journal(State1); - false -> State - end. - queue_name_to_dir_name(Name = #resource { kind = queue }) -> Bin = term_to_binary(Name), Size = 8*size(Bin), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d74f998e33..5b453b627a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1032,15 +1032,7 @@ queue_index_deliver(SeqIds, Qi) -> end, Qi, SeqIds). queue_index_flush_journal(Qi) -> - {_Oks, {false, Qi1}} = - rabbit_misc:unfold( - fun ({true, QiN}) -> - QiM = rabbit_queue_index:flush_journal(QiN), - {true, ok, {rabbit_queue_index:can_flush_journal(QiM), QiM}}; - ({false, _QiN}) -> - false - end, {true, Qi}), - Qi1. + rabbit_queue_index:full_flush_journal(Qi). verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; @@ -1071,7 +1063,6 @@ test_queue_index() -> ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]), %% should get length back as 0, as all the msgs were transient {0, Qi6} = rabbit_queue_index:init(test_queue()), - false = rabbit_queue_index:can_flush_journal(Qi6), {0, 10000, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), @@ -1093,8 +1084,7 @@ test_queue_index() -> ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsMsgIdsB)), Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), - true = rabbit_queue_index:can_flush_journal(Qi16), - Qi17 = rabbit_queue_index:flush_journal(Qi16), + Qi17 = queue_index_flush_journal(Qi16), %% the entire first segment will have gone as they were firstly %% transient, and secondly ack'd SegmentSize = rabbit_queue_index:segment_size(), @@ -1212,11 +1202,10 @@ test_variable_queue_dynamic_duration_change() -> {_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6), {VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7), VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8), - VQ10 = rabbit_variable_queue:flush_journal(VQ9), - VQ11 = rabbit_variable_queue:flush_journal(VQ10), - {empty, VQ12} = rabbit_variable_queue:fetch(VQ11), + VQ10 = rabbit_variable_queue:full_flush_journal(VQ9), + {empty, VQ11} = rabbit_variable_queue:fetch(VQ10), - rabbit_variable_queue:terminate(VQ12), + rabbit_variable_queue:terminate(VQ11), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0bce4c2b1a..7c644f59f5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -36,8 +36,7 @@ ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4, - tx_commit_from_vq/1, needs_sync/1, can_flush_journal/1, - flush_journal/1, status/1]). + tx_commit_from_vq/1, needs_sync/1, full_flush_journal/1, status/1]). %%---------------------------------------------------------------------------- @@ -150,8 +149,7 @@ ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> vqstate()). -spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()). -spec(needs_sync/1 :: (vqstate()) -> boolean()). --spec(can_flush_journal/1 :: (vqstate()) -> boolean()). --spec(flush_journal/1 :: (vqstate()) -> vqstate()). +-spec(full_flush_journal/1 :: (vqstate()) -> vqstate()). -spec(status/1 :: (vqstate()) -> [{atom(), any()}]). -endif. @@ -485,12 +483,9 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) -> needs_sync(_) -> true. -can_flush_journal(#vqstate { index_state = IndexState }) -> - rabbit_queue_index:can_flush_journal(IndexState). - -flush_journal(State = #vqstate { index_state = IndexState }) -> +full_flush_journal(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = - rabbit_queue_index:flush_journal(IndexState) }. + rabbit_queue_index:full_flush_journal(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, |
