summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_queue_index.erl41
-rw-r--r--src/rabbit_variable_queue.erl43
4 files changed, 50 insertions, 53 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 59a0ab1cc0..f3b4dbafa2 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -143,9 +143,9 @@
-behaviour(gen_server2).
-export([register_callback/3]).
--export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
- current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3,
- set_maximum_since_use/1, delete/1, clear/1]).
+-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
+ truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
+ copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -373,6 +373,13 @@ sync(Ref) ->
end
end).
+needs_sync(Ref) ->
+ with_handles(
+ [Ref],
+ fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
+ ([_Handle]) -> true
+ end).
+
position(Ref, NewOffset) ->
with_flushed_handles(
[Ref],
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fd2d7214d2..9494367764 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -279,9 +279,9 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
confirm_messages(MsgIds, State#q{
backing_queue_state = BQS1}))),
case BQ:needs_timeout(BQS1) of
- false -> {stop_sync_timer(State1), hibernate};
- idle -> {stop_sync_timer(State1), 0 };
- timed -> {ensure_sync_timer(State1), 0 }
+ false -> {stop_sync_timer(State1), hibernate };
+ idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
+ timed -> {ensure_sync_timer(State1), 0 }
end.
backing_queue_module(#amqqueue{arguments = Args}) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4c8793f1ad..3d07e8b055 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,8 +18,8 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
- next_segment_boundary/1, bounds/1, recover/1]).
+ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
+ read/3, next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -207,7 +207,8 @@
-> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
+-spec(sync/1 :: (qistate()) -> qistate()).
+-spec(needs_sync/1 :: (qistate()) -> boolean()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -283,20 +284,18 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-%% This is only called when there are outstanding confirms and the
-%% queue is idle.
-sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
- sync_if(not gb_sets:is_empty(MsgIds), State).
-
-sync(SeqIds, State) ->
- %% The SeqIds here contains the SeqId of every publish and ack to
- %% be sync'ed. Ideally we should go through these seqids and only
- %% sync the journal if the pubs or acks appear in the
- %% journal. However, this would be complex to do, and given that
- %% the variable queue publishes and acks to the qi, and then
- %% syncs, all in one operation, there is no possibility of the
- %% seqids not being in the journal.
- sync_if([] =/= SeqIds, State).
+%% This is called when there are outstanding confirms or when the
+%% queue is idle and the journal needs syncing (see needs_sync/1).
+sync(State = #qistate { journal_handle = undefined }) ->
+ State;
+sync(State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
+needs_sync(#qistate { journal_handle = undefined }) ->
+ false;
+needs_sync(#qistate { journal_handle = JournalHdl }) ->
+ file_handle_cache:needs_sync(JournalHdl).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -707,14 +706,6 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-sync_if(false, State) ->
- State;
-sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
- State;
-sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
-
notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(UG),
State #qistate { unsynced_msg_ids = gb_sets:new() }.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1b32d21197..46f6d6c1f9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -750,21 +750,27 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State) ->
- case needs_index_sync(State) of
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- true -> timed
+needs_timeout(State = #vqstate { index_state = IndexState }) ->
+ case must_sync_index(State) of
+ true -> timed;
+ false ->
+ case rabbit_queue_index:needs_sync(IndexState) of
+ true -> idle;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
+ end
end.
-timeout(State) ->
- a(reduce_memory_use(confirm_commit_index(State))).
+timeout(State = #vqstate { index_state = IndexState }) ->
+ IndexState1 = rabbit_queue_index:sync(IndexState),
+ State1 = State #vqstate { index_state = IndexState1 },
+ a(reduce_memory_use(State1)).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1281,13 +1287,6 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
-confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
- case needs_index_sync(State) of
- true -> State #vqstate {
- index_state = rabbit_queue_index:sync(IndexState) };
- false -> State
- end.
-
record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC,
@@ -1297,8 +1296,8 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = gb_sets:difference(UC, MsgIdSet),
confirmed = gb_sets:union (C, MsgIdSet) }.
-needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
%% If UC is empty then by definition, MIOD and MOD are also empty
%% and there's nothing that can be pending a sync.