summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-09 14:37:49 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-09 14:37:49 +0000
commit84703ed36b4b0780dd6a3cf3726bed9c816aba94 (patch)
tree4f1a4f453646393b673b728288e9898b98959ec0 /src
parentcc8ab871619c093d27823ace64488e73ca6a1f87 (diff)
downloadrabbitmq-server-git-84703ed36b4b0780dd6a3cf3726bed9c816aba94.tar.gz
Syncing instead of flushing.
This stops needs_timeout and timeout from going in a loop. However, since rabbit_queue_index:needs_sync works looking at unsynced_msg_ids, this doesn't preserving acks when the broker is terminated abruptedly, since the timeout won't be triggered (needs_sync will return false). If needs_sync is defined inspecting the write_buffer if the file_handle_cache, needs_timeout and timeout go in a loop, for reason that are not clear to me now.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_variable_queue.erl8
2 files changed, 8 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index a5620a8862..01bedba4b8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,7 +18,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, needs_flush/1,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -285,8 +285,8 @@ ack(SeqIds, State) ->
%% This is only called when there are outstanding confirms and the
%% queue is idle.
-sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
- sync_if(not gb_sets:is_empty(MsgIds), State).
+sync(State) ->
+ sync_if(needs_sync(State), State).
sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack to
@@ -298,12 +298,12 @@ sync(SeqIds, State) ->
%% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
+needs_sync(State = #qistate { unsynced_msg_ids = MsgIds }) ->
+ not gb_sets:is_empty(MsgIds).
+
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
-needs_flush(State = #qistate { dirty_count = 0}) -> false;
-needs_flush(_State) -> true.
-
read(StartEnd, StartEnd, State) ->
{[], State};
read(Start, End, State = #qistate { segments = Segments,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6f1c3847bb..3e605f049d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -734,18 +734,14 @@ ram_duration(State = #vqstate {
needs_timeout(State = #vqstate { index_state = IndexState }) ->
case needs_index_sync(State) of
true -> timed;
- false -> case rabbit_queue_index:needs_flush(IndexState) of
+ false -> case rabbit_queue_index:needs_sync(IndexState) of
true -> idle;
false -> false
end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
- IndexState1 =
- case needs_index_sync(State) of
- true -> rabbit_queue_index:sync(IndexState);
- false -> rabbit_queue_index:flush(IndexState)
- end,
+ IndexState1 = rabbit_queue_index:sync(IndexState),
State1 = State #vqstate { index_state = IndexState1 },
a(case reduce_memory_use(
fun (_Quota, State1) -> {0, State1} end,