diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 10 |
2 files changed, 16 insertions, 7 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c11fb54bcb..65cbe2c8be 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -143,9 +143,9 @@ -behaviour(gen_server2). -export([register_callback/3]). --export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, - set_maximum_since_use/1, delete/1, clear/1]). +-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, + truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, + copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -373,6 +373,13 @@ sync(Ref) -> end end). +needs_sync(Ref) -> + with_handles( + [Ref], + fun ([#handle { write_buffer = [] }]) -> false; + ([_Handle]) -> true + end). + position(Ref, NewOffset) -> with_flushed_handles( [Ref], diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 01bedba4b8..0a12b289db 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -285,8 +285,8 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State) -> - sync_if(needs_sync(State), State). +sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> + sync_if(true, State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack to @@ -298,8 +298,10 @@ sync(SeqIds, State) -> %% seqids not being in the journal. sync_if([] =/= SeqIds, State). -needs_sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> - not gb_sets:is_empty(MsgIds). +needs_sync(#qistate { journal_handle = undefined }) -> + false; +needs_sync(#qistate { journal_handle = JournalHdl }) -> + file_handle_cache:needs_sync(JournalHdl). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). |
