summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-07 14:58:53 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-07 14:58:53 +0000
commit6296c7e6436d44dedce9f52aeb52b127e69788dc (patch)
treea92cffef521c53e78f60f3d9f4334e9ce0dfd899
parent9b6c99384e421f767d3b353ad9f0d793f3072d28 (diff)
downloadrabbitmq-server-git-6296c7e6436d44dedce9f52aeb52b127e69788dc.tar.gz
Flushing periodically, trying out various way to find out when to flush.
-rw-r--r--src/file_handle_cache.erl19
-rw-r--r--src/rabbit_queue_index.erl7
-rw-r--r--src/rabbit_variable_queue.erl50
3 files changed, 63 insertions, 13 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c11fb54bcb..5c81ff2f89 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -149,6 +149,7 @@
-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
+-export([needs_flush/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/2]).
@@ -373,6 +374,24 @@ sync(Ref) ->
end
end).
+needs_flush(Ref) ->
+ with_handles(
+ [Ref],
+ fun([#handle { write_buffer = [] }]) ->
+ true;
+ ([Handle]) ->
+ false
+ end).
+
+%% needs_flush(Ref) ->
+%% with_handles(
+%% [Ref],
+%% fun([#handle { write_buffer_size = Size }]) when Size > 30000 ->
+%% true;
+%% ([Handle]) ->
+%% false
+%% end).
+
position(Ref, NewOffset) ->
with_flushed_handles(
[Ref],
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f03c1d1c76..0c1c20de91 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -22,6 +22,7 @@
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
+-export([needs_flush/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -298,6 +299,12 @@ sync(SeqIds, State) ->
%% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
+needs_flush(#qistate { journal_handle = JournalHdl }) ->
+ file_handle_cache:needs_flush(JournalHdl).
+
+%% needs_flush(#qistate { dirty_count = 0 }) -> false;
+%% needs_flush(_State) -> true.
+
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9b45b55852..3494d4a3df 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -731,21 +731,45 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State) ->
- case needs_index_sync(State) of
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- true -> timed
+needs_timeout(State = #vqstate { index_state = IndexState }) ->
+ case rabbit_queue_index:needs_flush(IndexState) of
+ true ->
+ timed;
+ false ->
+ case needs_index_sync(State) of
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end;
+ true -> timed
+ end
end.
-timeout(State) ->
- a(reduce_memory_use(confirm_commit_index(State))).
+%% needs_timeout(State = #vqstate { index_state = IndexState }) ->
+%% case needs_index_sync(State) of
+%% false -> case reduce_memory_use(
+%% fun (_Quota, State1) -> {0, State1} end,
+%% fun (_Quota, State1) -> State1 end,
+%% fun (_Quota, State1) -> {0, State1} end,
+%% State) of
+%% {true, _State} -> idle;
+%% {false, _State} -> false
+%% end;
+%% true -> timed
+%% end.
+
+timeout(State = #vqstate { index_state = IndexState }) ->
+ IndexState1 =
+ case needs_index_sync(State) of
+ true -> rabbit_queue_index:sync(IndexState);
+ false -> rabbit_queue_index:flush(IndexState)
+ end,
+ State1 = State #vqstate { index_state = IndexState1 },
+ a(reduce_memory_use(State1)).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.