summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl19
-rw-r--r--src/rabbit_queue_index.erl7
-rw-r--r--src/rabbit_variable_queue.erl42
3 files changed, 12 insertions, 56 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 5c81ff2f89..c11fb54bcb 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -149,7 +149,6 @@
-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
--export([needs_flush/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/2]).
@@ -374,24 +373,6 @@ sync(Ref) ->
end
end).
-needs_flush(Ref) ->
- with_handles(
- [Ref],
- fun([#handle { write_buffer = [] }]) ->
- true;
- ([Handle]) ->
- false
- end).
-
-%% needs_flush(Ref) ->
-%% with_handles(
-%% [Ref],
-%% fun([#handle { write_buffer_size = Size }]) when Size > 30000 ->
-%% true;
-%% ([Handle]) ->
-%% false
-%% end).
-
position(Ref, NewOffset) ->
with_flushed_handles(
[Ref],
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0c1c20de91..f03c1d1c76 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -22,7 +22,6 @@
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
--export([needs_flush/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -299,12 +298,6 @@ sync(SeqIds, State) ->
%% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
-needs_flush(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_flush(JournalHdl).
-
-%% needs_flush(#qistate { dirty_count = 0 }) -> false;
-%% needs_flush(_State) -> true.
-
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3494d4a3df..2491ac301b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -731,37 +731,12 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { index_state = IndexState }) ->
- case rabbit_queue_index:needs_flush(IndexState) of
- true ->
- timed;
- false ->
- case needs_index_sync(State) of
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- true -> timed
- end
+needs_timeout(State) ->
+ case needs_index_sync(State) of
+ false -> idle;
+ true -> timed
end.
-%% needs_timeout(State = #vqstate { index_state = IndexState }) ->
-%% case needs_index_sync(State) of
-%% false -> case reduce_memory_use(
-%% fun (_Quota, State1) -> {0, State1} end,
-%% fun (_Quota, State1) -> State1 end,
-%% fun (_Quota, State1) -> {0, State1} end,
-%% State) of
-%% {true, _State} -> idle;
-%% {false, _State} -> false
-%% end;
-%% true -> timed
-%% end.
-
timeout(State = #vqstate { index_state = IndexState }) ->
IndexState1 =
case needs_index_sync(State) of
@@ -769,7 +744,14 @@ timeout(State = #vqstate { index_state = IndexState }) ->
false -> rabbit_queue_index:flush(IndexState)
end,
State1 = State #vqstate { index_state = IndexState1 },
- a(reduce_memory_use(State1)).
+ a(case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> reduce_memory_use(State1);
+ {false, _State} -> State1
+ end).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.