summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_queue_index.erl83
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl8
4 files changed, 41 insertions, 56 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 945cd8bd5d..7364c479ff 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -925,7 +925,7 @@ handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) ->
- VQS1 = rabbit_variable_queue:full_flush_journal(VQS),
+ VQS1 = rabbit_variable_queue:flush_journal(VQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
rabbit_memory_monitor:report_queue_duration(self(), infinity),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6fe788f918..6e1f496ac6 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, sync_seq_ids/3, full_flush_journal/1,
+ write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -149,7 +149,7 @@
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()).
--spec(full_flush_journal/1 :: (qistate()) -> qistate()).
+-spec(flush_journal/1 :: (qistate()) -> qistate()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
@@ -198,13 +198,13 @@ write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) ->
{JDelDict1, State1} =
write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>],
[SeqId], JDelDict, State),
- maybe_full_flush(State1 #qistate { journal_del_dict = JDelDict1 }).
+ maybe_flush(State1 #qistate { journal_del_dict = JDelDict1 }).
write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) ->
{JAckDict1, State1} =
write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> || SeqId <- SeqIds],
SeqIds, JAckDict, State),
- maybe_full_flush(State1 #qistate { journal_ack_dict = JAckDict1 }).
+ maybe_flush(State1 #qistate { journal_ack_dict = JAckDict1 }).
sync_seq_ids(SeqIds, SyncAckJournal, State) ->
State1 = case SyncAckJournal of
@@ -226,10 +226,31 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) ->
StateM
end, State1, SegNumsSet).
-full_flush_journal(State = #qistate { journal_count = 0 }) ->
+flush_journal(State = #qistate { journal_count = 0 }) ->
State;
-full_flush_journal(State) ->
- full_flush_journal(flush_journal(State)).
+flush_journal(State = #qistate { journal_ack_dict = JAckDict,
+ journal_del_dict = JDelDict,
+ journal_count = JCount }) ->
+ SegNum = case dict:fetch_keys(JAckDict) of
+ [] -> hd(dict:fetch_keys(JDelDict));
+ [N|_] -> N
+ end,
+ Dels = seg_entries_from_dict(SegNum, JDelDict),
+ Acks = seg_entries_from_dict(SegNum, JAckDict),
+ State1 = append_dels_to_segment(SegNum, Dels, State),
+ State2 = append_acks_to_segment(SegNum, Acks, State1),
+ JCount1 = JCount - length(Dels) - length(Acks),
+ State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict),
+ journal_ack_dict = dict:erase(SegNum, JAckDict),
+ journal_count = JCount1 },
+ case JCount1 of
+ 0 -> {Hdl, State4} = get_journal_handle(State3),
+ {ok, 0} = file_handle_cache:position(Hdl, bof),
+ ok = file_handle_cache:truncate(Hdl),
+ ok = file_handle_cache:sync(Hdl),
+ State4;
+ _ -> flush_journal(State3)
+ end.
read_segment_entries(InitSeqId, State) ->
{SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
@@ -315,52 +336,16 @@ start_msg_store(DurableQueues) ->
end, TransientDirs),
ok.
-
-%%----------------------------------------------------------------------------
-%% Journal Flushing
-%%----------------------------------------------------------------------------
-
-flush_journal(State = #qistate { journal_count = 0 }) ->
- State;
-flush_journal(State = #qistate { journal_ack_dict = JAckDict,
- journal_del_dict = JDelDict,
- journal_count = JCount }) ->
- SegNum = case dict:fetch_keys(JAckDict) of
- [] -> hd(dict:fetch_keys(JDelDict));
- [N|_] -> N
- end,
- Dels = seg_entries_from_dict(SegNum, JDelDict),
- Acks = seg_entries_from_dict(SegNum, JAckDict),
- State1 = append_dels_to_segment(SegNum, Dels, State),
- State2 = append_acks_to_segment(SegNum, Acks, State1),
- JCount1 = JCount - length(Dels) - length(Acks),
- State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict),
- journal_ack_dict = dict:erase(SegNum, JAckDict),
- journal_count = JCount1 },
- if
- JCount1 == 0 ->
- {Hdl, State4} = get_journal_handle(State3),
- {ok, 0} = file_handle_cache:position(Hdl, bof),
- ok = file_handle_cache:truncate(Hdl),
- ok = file_handle_cache:sync(Hdl),
- State4;
- JCount1 > ?MAX_JOURNAL_ENTRY_COUNT ->
- flush_journal(State3);
- true ->
- State3
- end.
-
-maybe_full_flush(State = #qistate { journal_count = JCount }) ->
- case JCount > ?MAX_JOURNAL_ENTRY_COUNT of
- true -> full_flush_journal(State);
- false -> State
- end.
-
-
%%----------------------------------------------------------------------------
%% Minor Helpers
%%----------------------------------------------------------------------------
+maybe_flush(State = #qistate { journal_count = JCount })
+ when JCount > ?MAX_JOURNAL_ENTRY_COUNT ->
+ flush_journal(State);
+maybe_flush(State) ->
+ State.
+
write_to_journal(BinList, SeqIds, Dict,
State = #qistate { journal_count = JCount }) ->
{Hdl, State1} = get_journal_handle(State),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 43fdaf3b8b..c931e0b051 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1065,7 +1065,7 @@ queue_index_deliver(SeqIds, Qi) ->
end, Qi, SeqIds).
queue_index_flush_journal(Qi) ->
- rabbit_queue_index:full_flush_journal(Qi).
+ rabbit_queue_index:flush_journal(Qi).
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
@@ -1230,7 +1230,7 @@ test_variable_queue_dynamic_duration_change() ->
{_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6),
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
- VQ10 = rabbit_variable_queue:full_flush_journal(VQ9),
+ VQ10 = rabbit_variable_queue:flush_journal(VQ9),
{empty, VQ11} = rabbit_variable_queue:fetch(VQ10),
rabbit_variable_queue:terminate(VQ11),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6806a0cdc8..3958216e45 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -36,7 +36,7 @@
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1,
requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
- full_flush_journal/1, status/1]).
+ flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
@@ -154,7 +154,7 @@
([msg_id()], [ack()], {pid(), any()}, vqstate()) -> vqstate()).
-spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()).
-spec(needs_sync/1 :: (vqstate()) -> boolean()).
--spec(full_flush_journal/1 :: (vqstate()) -> vqstate()).
+-spec(flush_journal/1 :: (vqstate()) -> vqstate()).
-spec(status/1 :: (vqstate()) -> [{atom(), any()}]).
-endif.
@@ -463,9 +463,9 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) ->
needs_sync(_) ->
true.
-full_flush_journal(State = #vqstate { index_state = IndexState }) ->
+flush_journal(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =
- rabbit_queue_index:full_flush_journal(IndexState) }.
+ rabbit_queue_index:flush_journal(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},