diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-09 14:37:49 +0000 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-09 14:37:49 +0000 |
| commit | 84703ed36b4b0780dd6a3cf3726bed9c816aba94 (patch) | |
| tree | 4f1a4f453646393b673b728288e9898b98959ec0 /src | |
| parent | cc8ab871619c093d27823ace64488e73ca6a1f87 (diff) | |
| download | rabbitmq-server-git-84703ed36b4b0780dd6a3cf3726bed9c816aba94.tar.gz | |
Syncing instead of flushing.
This stops needs_timeout and timeout from going in a loop. However, since
rabbit_queue_index:needs_sync works looking at unsynced_msg_ids, this doesn't
preserving acks when the broker is terminated abruptedly, since the timeout
won't be triggered (needs_sync will return false).
If needs_sync is defined inspecting the write_buffer if the file_handle_cache,
needs_timeout and timeout go in a loop, for reason that are not clear to me
now.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
2 files changed, 8 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index a5620a8862..01bedba4b8 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,7 +18,7 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, needs_flush/1, + publish/5, deliver/2, ack/2, sync/1, sync/2, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -285,8 +285,8 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> - sync_if(not gb_sets:is_empty(MsgIds), State). +sync(State) -> + sync_if(needs_sync(State), State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack to @@ -298,12 +298,12 @@ sync(SeqIds, State) -> %% seqids not being in the journal. sync_if([] =/= SeqIds, State). +needs_sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> + not gb_sets:is_empty(MsgIds). + flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). -needs_flush(State = #qistate { dirty_count = 0}) -> false; -needs_flush(_State) -> true. - read(StartEnd, StartEnd, State) -> {[], State}; read(Start, End, State = #qistate { segments = Segments, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6f1c3847bb..3e605f049d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -734,18 +734,14 @@ ram_duration(State = #vqstate { needs_timeout(State = #vqstate { index_state = IndexState }) -> case needs_index_sync(State) of true -> timed; - false -> case rabbit_queue_index:needs_flush(IndexState) of + false -> case rabbit_queue_index:needs_sync(IndexState) of true -> idle; false -> false end end. timeout(State = #vqstate { index_state = IndexState }) -> - IndexState1 = - case needs_index_sync(State) of - true -> rabbit_queue_index:sync(IndexState); - false -> rabbit_queue_index:flush(IndexState) - end, + IndexState1 = rabbit_queue_index:sync(IndexState), State1 = State #vqstate { index_state = IndexState1 }, a(case reduce_memory_use( fun (_Quota, State1) -> {0, State1} end, |
