diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
4 files changed, 41 insertions, 56 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 945cd8bd5d..7364c479ff 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -925,7 +925,7 @@ handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) -> - VQS1 = rabbit_variable_queue:full_flush_journal(VQS), + VQS1 = rabbit_variable_queue:flush_journal(VQS), %% no activity for a while == 0 egress and ingress rates DesiredDuration = rabbit_memory_monitor:report_queue_duration(self(), infinity), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6fe788f918..6e1f496ac6 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, sync_seq_ids/3, full_flush_journal/1, + write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). @@ -149,7 +149,7 @@ -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()). --spec(full_flush_journal/1 :: (qistate()) -> qistate()). +-spec(flush_journal/1 :: (qistate()) -> qistate()). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). @@ -198,13 +198,13 @@ write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) -> {JDelDict1, State1} = write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>], [SeqId], JDelDict, State), - maybe_full_flush(State1 #qistate { journal_del_dict = JDelDict1 }). + maybe_flush(State1 #qistate { journal_del_dict = JDelDict1 }). write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) -> {JAckDict1, State1} = write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> || SeqId <- SeqIds], SeqIds, JAckDict, State), - maybe_full_flush(State1 #qistate { journal_ack_dict = JAckDict1 }). + maybe_flush(State1 #qistate { journal_ack_dict = JAckDict1 }). sync_seq_ids(SeqIds, SyncAckJournal, State) -> State1 = case SyncAckJournal of @@ -226,10 +226,31 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) -> StateM end, State1, SegNumsSet). -full_flush_journal(State = #qistate { journal_count = 0 }) -> +flush_journal(State = #qistate { journal_count = 0 }) -> State; -full_flush_journal(State) -> - full_flush_journal(flush_journal(State)). +flush_journal(State = #qistate { journal_ack_dict = JAckDict, + journal_del_dict = JDelDict, + journal_count = JCount }) -> + SegNum = case dict:fetch_keys(JAckDict) of + [] -> hd(dict:fetch_keys(JDelDict)); + [N|_] -> N + end, + Dels = seg_entries_from_dict(SegNum, JDelDict), + Acks = seg_entries_from_dict(SegNum, JAckDict), + State1 = append_dels_to_segment(SegNum, Dels, State), + State2 = append_acks_to_segment(SegNum, Acks, State1), + JCount1 = JCount - length(Dels) - length(Acks), + State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), + journal_ack_dict = dict:erase(SegNum, JAckDict), + journal_count = JCount1 }, + case JCount1 of + 0 -> {Hdl, State4} = get_journal_handle(State3), + {ok, 0} = file_handle_cache:position(Hdl, bof), + ok = file_handle_cache:truncate(Hdl), + ok = file_handle_cache:sync(Hdl), + State4; + _ -> flush_journal(State3) + end. read_segment_entries(InitSeqId, State) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), @@ -315,52 +336,16 @@ start_msg_store(DurableQueues) -> end, TransientDirs), ok. - -%%---------------------------------------------------------------------------- -%% Journal Flushing -%%---------------------------------------------------------------------------- - -flush_journal(State = #qistate { journal_count = 0 }) -> - State; -flush_journal(State = #qistate { journal_ack_dict = JAckDict, - journal_del_dict = JDelDict, - journal_count = JCount }) -> - SegNum = case dict:fetch_keys(JAckDict) of - [] -> hd(dict:fetch_keys(JDelDict)); - [N|_] -> N - end, - Dels = seg_entries_from_dict(SegNum, JDelDict), - Acks = seg_entries_from_dict(SegNum, JAckDict), - State1 = append_dels_to_segment(SegNum, Dels, State), - State2 = append_acks_to_segment(SegNum, Acks, State1), - JCount1 = JCount - length(Dels) - length(Acks), - State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict), - journal_ack_dict = dict:erase(SegNum, JAckDict), - journal_count = JCount1 }, - if - JCount1 == 0 -> - {Hdl, State4} = get_journal_handle(State3), - {ok, 0} = file_handle_cache:position(Hdl, bof), - ok = file_handle_cache:truncate(Hdl), - ok = file_handle_cache:sync(Hdl), - State4; - JCount1 > ?MAX_JOURNAL_ENTRY_COUNT -> - flush_journal(State3); - true -> - State3 - end. - -maybe_full_flush(State = #qistate { journal_count = JCount }) -> - case JCount > ?MAX_JOURNAL_ENTRY_COUNT of - true -> full_flush_journal(State); - false -> State - end. - - %%---------------------------------------------------------------------------- %% Minor Helpers %%---------------------------------------------------------------------------- +maybe_flush(State = #qistate { journal_count = JCount }) + when JCount > ?MAX_JOURNAL_ENTRY_COUNT -> + flush_journal(State); +maybe_flush(State) -> + State. + write_to_journal(BinList, SeqIds, Dict, State = #qistate { journal_count = JCount }) -> {Hdl, State1} = get_journal_handle(State), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 43fdaf3b8b..c931e0b051 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1065,7 +1065,7 @@ queue_index_deliver(SeqIds, Qi) -> end, Qi, SeqIds). queue_index_flush_journal(Qi) -> - rabbit_queue_index:full_flush_journal(Qi). + rabbit_queue_index:flush_journal(Qi). verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; @@ -1230,7 +1230,7 @@ test_variable_queue_dynamic_duration_change() -> {_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6), {VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7), VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8), - VQ10 = rabbit_variable_queue:full_flush_journal(VQ9), + VQ10 = rabbit_variable_queue:flush_journal(VQ9), {empty, VQ11} = rabbit_variable_queue:fetch(VQ10), rabbit_variable_queue:terminate(VQ11), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6806a0cdc8..3958216e45 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -36,7 +36,7 @@ ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, - full_flush_journal/1, status/1]). + flush_journal/1, status/1]). %%---------------------------------------------------------------------------- @@ -154,7 +154,7 @@ ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> vqstate()). -spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()). -spec(needs_sync/1 :: (vqstate()) -> boolean()). --spec(full_flush_journal/1 :: (vqstate()) -> vqstate()). +-spec(flush_journal/1 :: (vqstate()) -> vqstate()). -spec(status/1 :: (vqstate()) -> [{atom(), any()}]). -endif. @@ -463,9 +463,9 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) -> needs_sync(_) -> true. -full_flush_journal(State = #vqstate { index_state = IndexState }) -> +flush_journal(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = - rabbit_queue_index:full_flush_journal(IndexState) }. + rabbit_queue_index:flush_journal(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, |
