diff options
| -rw-r--r-- | src/file_handle_cache.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 50 |
3 files changed, 63 insertions, 13 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c11fb54bcb..5c81ff2f89 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -149,6 +149,7 @@ -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). +-export([needs_flush/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). @@ -373,6 +374,24 @@ sync(Ref) -> end end). +needs_flush(Ref) -> + with_handles( + [Ref], + fun([#handle { write_buffer = [] }]) -> + true; + ([Handle]) -> + false + end). + +%% needs_flush(Ref) -> +%% with_handles( +%% [Ref], +%% fun([#handle { write_buffer_size = Size }]) when Size > 30000 -> +%% true; +%% ([Handle]) -> +%% false +%% end). + position(Ref, NewOffset) -> with_flushed_handles( [Ref], diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f03c1d1c76..0c1c20de91 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -22,6 +22,7 @@ next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). +-export([needs_flush/1]). -define(CLEAN_FILENAME, "clean.dot"). @@ -298,6 +299,12 @@ sync(SeqIds, State) -> %% seqids not being in the journal. sync_if([] =/= SeqIds, State). +needs_flush(#qistate { journal_handle = JournalHdl }) -> + file_handle_cache:needs_flush(JournalHdl). + +%% needs_flush(#qistate { dirty_count = 0 }) -> false; +%% needs_flush(_State) -> true. + flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 9b45b55852..3494d4a3df 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -731,21 +731,45 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State) -> - case needs_index_sync(State) of - false -> case reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - true -> timed +needs_timeout(State = #vqstate { index_state = IndexState }) -> + case rabbit_queue_index:needs_flush(IndexState) of + true -> + timed; + false -> + case needs_index_sync(State) of + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end; + true -> timed + end end. -timeout(State) -> - a(reduce_memory_use(confirm_commit_index(State))). +%% needs_timeout(State = #vqstate { index_state = IndexState }) -> +%% case needs_index_sync(State) of +%% false -> case reduce_memory_use( +%% fun (_Quota, State1) -> {0, State1} end, +%% fun (_Quota, State1) -> State1 end, +%% fun (_Quota, State1) -> {0, State1} end, +%% State) of +%% {true, _State} -> idle; +%% {false, _State} -> false +%% end; +%% true -> timed +%% end. + +timeout(State = #vqstate { index_state = IndexState }) -> + IndexState1 = + case needs_index_sync(State) of + true -> rabbit_queue_index:sync(IndexState); + false -> rabbit_queue_index:flush(IndexState) + end, + State1 = State #vqstate { index_state = IndexState1 }, + a(reduce_memory_use(State1)). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. |
