diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
2 files changed, 8 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a5620a8862..01bedba4b8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,7 +18,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, needs_flush/1, + publish/5, deliver/2, ack/2, sync/1, sync/2, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -285,8 +285,8 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> - sync_if(not gb_sets:is_empty(MsgIds), State). +sync(State) -> + sync_if(needs_sync(State), State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack to @@ -298,12 +298,12 @@ sync(SeqIds, State) -> %% seqids not being in the journal. sync_if([] =/= SeqIds, State). +needs_sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> + not gb_sets:is_empty(MsgIds). + flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). -needs_flush(State = #qistate { dirty_count = 0}) -> false; -needs_flush(_State) -> true. - read(StartEnd, StartEnd, State) -> {[], State}; read(Start, End, State = #qistate { segments = Segments, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6f1c3847bb..3e605f049d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -734,18 +734,14 @@ ram_duration(State = #vqstate { needs_timeout(State = #vqstate { index_state = IndexState }) -> case needs_index_sync(State) of true -> timed; - false -> case rabbit_queue_index:needs_flush(IndexState) of + false -> case rabbit_queue_index:needs_sync(IndexState) of true -> idle; false -> false end end. timeout(State = #vqstate { index_state = IndexState }) -> - IndexState1 = - case needs_index_sync(State) of - true -> rabbit_queue_index:sync(IndexState); - false -> rabbit_queue_index:flush(IndexState) - end, + IndexState1 = rabbit_queue_index:sync(IndexState), State1 = State #vqstate { index_state = IndexState1 }, a(case reduce_memory_use( fun (_Quota, State1) -> {0, State1} end, |
