summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_queue_index.erl39
-rw-r--r--src/rabbit_tests.erl32
-rw-r--r--src/rabbit_variable_queue.erl10
4 files changed, 69 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cd94c6e450..180a9f8a07 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -153,28 +153,32 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State = #q { variable_queue_state = VQS }) ->
+next_state(State = #q{variable_queue_state = VQS}) ->
next_state1(State, rabbit_variable_queue:needs_sync(VQS)).
-next_state1(State = #q { sync_timer_ref = undefined }, true) ->
+next_state1(State = #q{sync_timer_ref = undefined}, true) ->
{start_sync_timer(State), 0};
next_state1(State, true) ->
{State, 0};
-next_state1(State = #q { sync_timer_ref = undefined }, false) ->
- {State, hibernate};
+next_state1(State = #q{sync_timer_ref = undefined,
+ variable_queue_state = VQS}, false) ->
+ {State, case rabbit_variable_queue:can_flush_journal(VQS) of
+ true -> 0;
+ false -> hibernate
+ end};
next_state1(State, false) ->
{stop_sync_timer(State), 0}.
-start_sync_timer(State = #q { sync_timer_ref = undefined }) ->
+start_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, rabbit_amqqueue,
tx_commit_vq_callback, [self()]),
- State #q { sync_timer_ref = TRef }.
+ State#q{sync_timer_ref = TRef}.
-stop_sync_timer(State = #q { sync_timer_ref = TRef }) ->
+stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- State #q { sync_timer_ref = undefined }.
+ State#q{sync_timer_ref = undefined}.
-assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) ->
+assert_invariant(#q{active_consumers = AC, variable_queue_state = VQS}) ->
true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)).
lookup_ch(ChPid) ->
@@ -868,6 +872,13 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info(timeout, State = #q{variable_queue_state = VQS,
+ sync_timer_ref = undefined}) ->
+ %% if sync_timer_ref is undefined then we must have set the
+ %% timeout to zero because we thought we could flush the journal
+ noreply(State#q{variable_queue_state =
+ rabbit_variable_queue:flush_journal(VQS)});
+
handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
noreply(
run_message_queue(
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e3057f9cb4..39116b0dd2 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/3,
+ write_delivered/2, write_acks/2, can_flush_journal/1, flush_journal/1, sync_seq_ids/3,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -136,7 +136,8 @@
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}).
+-spec(flush_journal/1 :: (qistate()) -> qistate()).
+-spec(can_flush_journal/1 :: (qistate()) -> boolean()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{( [{msg_id(), seq_id(), boolean(), boolean()}]
| 'not_found'), qistate()}).
@@ -211,12 +212,6 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict,
false -> State2
end.
-full_flush_journal(State) ->
- case flush_journal(State) of
- {true, State1} -> full_flush_journal(State1);
- {false, State1} -> State1
- end.
-
sync_seq_ids(SeqIds, SyncAckJournal, State) ->
State1 = case SyncAckJournal of
true -> {Hdl, State2} = get_journal_handle(State),
@@ -237,8 +232,13 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) ->
StateM
end, State1, SegNumsSet).
+can_flush_journal(#qistate { journal_ack_count = 0 }) ->
+ false;
+can_flush_journal(_) ->
+ true.
+
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
- {false, State};
+ State;
flush_journal(State = #qistate { journal_ack_dict = JAckDict,
journal_ack_count = JAckCount }) ->
[SegNum|_] = dict:fetch_keys(JAckDict),
@@ -253,11 +253,11 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict,
ok = file_handle_cache:position(Hdl, bof),
ok = file_handle_cache:truncate(Hdl),
ok = file_handle_cache:sync(Hdl),
- {false, State3};
+ State3;
JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
flush_journal(State2);
true ->
- {true, State2}
+ State2
end.
read_segment_entries(InitSeqId, State) ->
@@ -348,6 +348,13 @@ start_msg_store(DurableQueues) ->
%% Minor Helpers
%%----------------------------------------------------------------------------
+full_flush_journal(State) ->
+ case can_flush_journal(State) of
+ true -> State1 = flush_journal(State),
+ full_flush_journal(State1);
+ false -> State
+ end.
+
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
Bin = term_to_binary(Name),
Size = 8*size(Bin),
@@ -421,6 +428,11 @@ add_ack_to_ack_dict(SeqId, ADict) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
+all_segment_nums(Dir) ->
+ [list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
+ || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
@@ -454,11 +466,6 @@ queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries],
%% Startup Functions
%%----------------------------------------------------------------------------
-all_segment_nums(Dir) ->
- [list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
- || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
-
find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) ->
SegNums = all_segment_nums(Dir),
{TotalMsgCount, State1} =
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8114247604..ebd8432a20 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1040,31 +1040,33 @@ test_queue_index() ->
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 0, as all the msgs were transient
{0, Qi6} = rabbit_queue_index:init(test_queue()),
- {false, Qi7} = rabbit_queue_index:flush_journal(Qi6),
- {0, 10001, Qi8} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi7),
- {Qi9, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi8),
- {0, 20001, Qi10} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi9),
- {ReadB, Qi11} = rabbit_queue_index:read_segment_entries(0, Qi10),
+ false = rabbit_queue_index:can_flush_journal(Qi6),
+ {0, 10001, Qi7} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
+ {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
+ {0, 20001, Qi9} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8),
+ {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsMsgIdsB)),
- _Qi12 = rabbit_queue_index:terminate(Qi11),
+ _Qi11 = rabbit_queue_index:terminate(Qi10),
ok = rabbit_msg_store:stop(),
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 10000
LenB = length(SeqIdsB),
- {LenB, Qi13} = rabbit_queue_index:init(test_queue()),
- {0, 20001, Qi14} =
- rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi13),
- Qi15 = lists:foldl(
+ {LenB, Qi12} = rabbit_queue_index:init(test_queue()),
+ {0, 20001, Qi13} =
+ rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
+ Qi14 = lists:foldl(
fun (SeqId, QiN) ->
rabbit_queue_index:write_delivered(SeqId, QiN)
- end, Qi14, SeqIdsB),
- {ReadC, Qi16} = rabbit_queue_index:read_segment_entries(0, Qi15),
+ end, Qi13, SeqIdsB),
+ {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsMsgIdsB)),
- Qi17 = rabbit_queue_index:write_acks(SeqIdsB, Qi16),
+ Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15),
+ true = rabbit_queue_index:can_flush_journal(Qi16),
+ Qi17 = rabbit_queue_index:flush_journal(Qi16),
{0, 20001, Qi18} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate(Qi18),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 33e09c113d..da56487e01 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -35,7 +35,8 @@
set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
- tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1]).
+ tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
+ can_flush_journal/1, flush_journal/1]).
%%----------------------------------------------------------------------------
@@ -412,6 +413,13 @@ needs_sync(#vqstate { on_sync = {_, _, []} }) ->
needs_sync(_) ->
true.
+can_flush_journal(#vqstate { index_state = IndexState }) ->
+ rabbit_queue_index:can_flush_journal(IndexState).
+
+flush_journal(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state =
+ rabbit_queue_index:flush_journal(IndexState) }.
+
%%----------------------------------------------------------------------------
persistent_msg_ids(Pubs) ->