summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_queue_index.erl100
-rw-r--r--src/rabbit_tests.erl21
-rw-r--r--src/rabbit_variable_queue.erl13
5 files changed, 60 insertions, 79 deletions
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 34904850be..43488c9274 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -109,7 +109,7 @@ exec erl \
-os_mon start_cpu_sup true \
-os_mon start_disksup false \
-os_mon start_memsup false \
- -os_mon vm_memory_high_watermark 0.4 \
+ -os_mon vm_memory_high_watermark 0.08 \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
${RABBITMQ_CLUSTER_CONFIG_OPTION} \
${RABBITMQ_SERVER_START_ARGS} \
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a79abe8cdb..b0c1ccacb8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -923,5 +923,6 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) ->
VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS),
+ VQS2 = rabbit_variable_queue:full_flush_journal(VQS1),
{hibernate, stop_egress_rate_timer(
- State#q{ variable_queue_state = VQS1 })}.
+ State#q{ variable_queue_state = VQS2 })}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 54d681c5d3..bd8996764e 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,10 +32,9 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, sync_seq_ids/3, can_flush_journal/1,
- flush_journal/1, read_segment_entries/2, next_segment_boundary/1,
- segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1,
- start_msg_store/1]).
+ write_delivered/2, write_acks/2, sync_seq_ids/3, full_flush_journal/1,
+ read_segment_entries/2, next_segment_boundary/1, segment_size/0,
+ find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
%%----------------------------------------------------------------------------
%% The queue disk index
@@ -150,8 +149,7 @@
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()).
--spec(can_flush_journal/1 :: (qistate()) -> boolean()).
--spec(flush_journal/1 :: (qistate()) -> qistate()).
+-spec(full_flush_journal/1 :: (qistate()) -> qistate()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
@@ -228,40 +226,10 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) ->
StateM
end, State1, SegNumsSet).
-can_flush_journal(#qistate { journal_count = 0 }) ->
- false;
-can_flush_journal(_) ->
- true.
-
-flush_journal(State = #qistate { journal_count = 0 }) ->
+full_flush_journal(State = #qistate { journal_count = 0 }) ->
State;
-flush_journal(State = #qistate { journal_ack_dict = JAckDict,
- journal_del_dict = JDelDict,
- journal_count = JCount }) ->
- SegNum = case dict:fetch_keys(JAckDict) of
- [] -> hd(dict:fetch_keys(JDelDict));
- [N|_] -> N
- end,
- Dels = seg_entries_from_dict(SegNum, JDelDict),
- Acks = seg_entries_from_dict(SegNum, JAckDict),
- State1 = append_dels_to_segment(SegNum, Dels, State),
- State2 = append_acks_to_segment(SegNum, Acks, State1),
- JCount1 = JCount - length(Dels) - length(Acks),
- State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict),
- journal_ack_dict = dict:erase(SegNum, JAckDict),
- journal_count = JCount1 },
- if
- JCount1 == 0 ->
- {Hdl, State4} = get_journal_handle(State3),
- {ok, 0} = file_handle_cache:position(Hdl, bof),
- ok = file_handle_cache:truncate(Hdl),
- ok = file_handle_cache:sync(Hdl),
- State4;
- JCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
- flush_journal(State3);
- true ->
- State3
- end.
+full_flush_journal(State) ->
+ full_flush_journal(flush_journal(State)).
read_segment_entries(InitSeqId, State) ->
{SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
@@ -349,6 +317,47 @@ start_msg_store(DurableQueues) ->
%%----------------------------------------------------------------------------
+%% Journal Flushing
+%%----------------------------------------------------------------------------
+
+flush_journal(State = #qistate { journal_count = 0 }) ->
+ State;
+flush_journal(State = #qistate { journal_ack_dict = JAckDict,
+ journal_del_dict = JDelDict,
+ journal_count = JCount }) ->
+ SegNum = case dict:fetch_keys(JAckDict) of
+ [] -> hd(dict:fetch_keys(JDelDict));
+ [N|_] -> N
+ end,
+ Dels = seg_entries_from_dict(SegNum, JDelDict),
+ Acks = seg_entries_from_dict(SegNum, JAckDict),
+ State1 = append_dels_to_segment(SegNum, Dels, State),
+ State2 = append_acks_to_segment(SegNum, Acks, State1),
+ JCount1 = JCount - length(Dels) - length(Acks),
+ State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict),
+ journal_ack_dict = dict:erase(SegNum, JAckDict),
+ journal_count = JCount1 },
+ if
+ JCount1 == 0 ->
+ {Hdl, State4} = get_journal_handle(State3),
+ {ok, 0} = file_handle_cache:position(Hdl, bof),
+ ok = file_handle_cache:truncate(Hdl),
+ ok = file_handle_cache:sync(Hdl),
+ State4;
+ JCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
+ flush_journal(State3);
+ true ->
+ State3
+ end.
+
+maybe_full_flush(State = #qistate { journal_count = JCount }) ->
+ case JCount > ?MAX_ACK_JOURNAL_ENTRY_COUNT of
+ true -> full_flush_journal(State);
+ false -> State
+ end.
+
+
+%%----------------------------------------------------------------------------
%% Minor Helpers
%%----------------------------------------------------------------------------
@@ -363,19 +372,6 @@ write_to_journal(BinList, SeqIds, Dict,
end, {Dict, JCount}, SeqIds),
{Dict1, State1 #qistate { journal_count = JCount1 }}.
-maybe_full_flush(State = #qistate { journal_count = JCount }) ->
- case JCount > ?MAX_ACK_JOURNAL_ENTRY_COUNT of
- true -> full_flush_journal(State);
- false -> State
- end.
-
-full_flush_journal(State) ->
- case can_flush_journal(State) of
- true -> State1 = flush_journal(State),
- full_flush_journal(State1);
- false -> State
- end.
-
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
Bin = term_to_binary(Name),
Size = 8*size(Bin),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d74f998e33..5b453b627a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1032,15 +1032,7 @@ queue_index_deliver(SeqIds, Qi) ->
end, Qi, SeqIds).
queue_index_flush_journal(Qi) ->
- {_Oks, {false, Qi1}} =
- rabbit_misc:unfold(
- fun ({true, QiN}) ->
- QiM = rabbit_queue_index:flush_journal(QiN),
- {true, ok, {rabbit_queue_index:can_flush_journal(QiM), QiM}};
- ({false, _QiN}) ->
- false
- end, {true, Qi}),
- Qi1.
+ rabbit_queue_index:full_flush_journal(Qi).
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
@@ -1071,7 +1063,6 @@ test_queue_index() ->
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 0, as all the msgs were transient
{0, Qi6} = rabbit_queue_index:init(test_queue()),
- false = rabbit_queue_index:can_flush_journal(Qi6),
{0, 10000, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
@@ -1093,8 +1084,7 @@ test_queue_index() ->
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsMsgIdsB)),
Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15),
- true = rabbit_queue_index:can_flush_journal(Qi16),
- Qi17 = rabbit_queue_index:flush_journal(Qi16),
+ Qi17 = queue_index_flush_journal(Qi16),
%% the entire first segment will have gone as they were firstly
%% transient, and secondly ack'd
SegmentSize = rabbit_queue_index:segment_size(),
@@ -1212,11 +1202,10 @@ test_variable_queue_dynamic_duration_change() ->
{_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6),
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
- VQ10 = rabbit_variable_queue:flush_journal(VQ9),
- VQ11 = rabbit_variable_queue:flush_journal(VQ10),
- {empty, VQ12} = rabbit_variable_queue:fetch(VQ11),
+ VQ10 = rabbit_variable_queue:full_flush_journal(VQ9),
+ {empty, VQ11} = rabbit_variable_queue:fetch(VQ10),
- rabbit_variable_queue:terminate(VQ12),
+ rabbit_variable_queue:terminate(VQ11),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0bce4c2b1a..7c644f59f5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -36,8 +36,7 @@
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1,
maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2,
tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4,
- tx_commit_from_vq/1, needs_sync/1, can_flush_journal/1,
- flush_journal/1, status/1]).
+ tx_commit_from_vq/1, needs_sync/1, full_flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
@@ -150,8 +149,7 @@
([msg_id()], [ack()], {pid(), any()}, vqstate()) -> vqstate()).
-spec(tx_commit_from_vq/1 :: (vqstate()) -> vqstate()).
-spec(needs_sync/1 :: (vqstate()) -> boolean()).
--spec(can_flush_journal/1 :: (vqstate()) -> boolean()).
--spec(flush_journal/1 :: (vqstate()) -> vqstate()).
+-spec(full_flush_journal/1 :: (vqstate()) -> vqstate()).
-spec(status/1 :: (vqstate()) -> [{atom(), any()}]).
-endif.
@@ -485,12 +483,9 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) ->
needs_sync(_) ->
true.
-can_flush_journal(#vqstate { index_state = IndexState }) ->
- rabbit_queue_index:can_flush_journal(IndexState).
-
-flush_journal(State = #vqstate { index_state = IndexState }) ->
+full_flush_journal(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state =
- rabbit_queue_index:flush_journal(IndexState) }.
+ rabbit_queue_index:full_flush_journal(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},