diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-16 17:34:34 +0000 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-16 17:34:34 +0000 |
| commit | 06f5ec199e9314aa1de26c4d673e53699c4a3549 (patch) | |
| tree | a0702ca00df51ea6b30bf72b094a1b1569cdaff0 /src | |
| parent | c7f164e1725dfc0a1f122283ed4d0eb5183c1dd8 (diff) | |
| parent | e2bec715642c14dcf92324849eb76739e1093af3 (diff) | |
| download | rabbitmq-server-git-06f5ec199e9314aa1de26c4d673e53699c4a3549.tar.gz | |
Merge default
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 36 |
4 files changed, 40 insertions, 29 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 59a0ab1cc0..f3b4dbafa2 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -143,9 +143,9 @@ -behaviour(gen_server2). -export([register_callback/3]). --export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, - set_maximum_since_use/1, delete/1, clear/1]). +-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, + truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, + copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -373,6 +373,13 @@ sync(Ref) -> end end). +needs_sync(Ref) -> + with_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false; + ([_Handle]) -> true + end). + position(Ref, NewOffset) -> with_flushed_handles( [Ref], diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 12cd0c93ff..34efbde84d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -23,6 +23,7 @@ -define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(IDLE_TIMEOUT, 10). -define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined, needs_confirming = false}). @@ -250,9 +251,9 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> confirm_messages(MsgIds, State#q{ backing_queue_state = BQS1}))), case BQ:needs_timeout(BQS1) of - false -> {stop_sync_timer(State1), hibernate}; - idle -> {stop_sync_timer(State1), 0 }; - timed -> {ensure_sync_timer(State1), 0 } + false -> {stop_sync_timer(State1), hibernate }; + idle -> {stop_sync_timer(State1), ?IDLE_TIMEOUT}; + timed -> {ensure_sync_timer(State1), 0 } end. backing_queue_module(#amqqueue{arguments = Args}) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4c8793f1ad..366e1b1be1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,8 +18,8 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, - next_segment_boundary/1, bounds/1, recover/1]). + publish/5, deliver/2, ack/2, sync/1, sync/2, needs_sync/1, flush/1, + read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -285,8 +285,8 @@ ack(SeqIds, State) -> %% This is only called when there are outstanding confirms and the %% queue is idle. -sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> - sync_if(not gb_sets:is_empty(MsgIds), State). +sync(State) -> + sync_if(true, State). sync(SeqIds, State) -> %% The SeqIds here contains the SeqId of every publish and ack to @@ -298,6 +298,11 @@ sync(SeqIds, State) -> %% seqids not being in the journal. sync_if([] =/= SeqIds, State). +needs_sync(#qistate { journal_handle = undefined }) -> + false; +needs_sync(#qistate { journal_handle = JournalHdl }) -> + file_handle_cache:needs_sync(JournalHdl). + flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 52eb168a42..ea18ef2a2b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -731,21 +731,26 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State) -> +needs_timeout(State = #vqstate { index_state = IndexState }) -> case needs_index_sync(State) of - false -> case reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - true -> timed + true -> timed; + false -> case rabbit_queue_index:needs_sync(IndexState) of + true -> idle; + false -> false + end end. -timeout(State) -> - a(reduce_memory_use(confirm_commit_index(State))). +timeout(State = #vqstate { index_state = IndexState }) -> + IndexState1 = rabbit_queue_index:sync(IndexState), + State1 = State #vqstate { index_state = IndexState1 }, + a(case reduce_memory_use( + fun (_Quota, State2) -> {0, State2} end, + fun (_Quota, State2) -> State2 end, + fun (_Quota, State2) -> {0, State2} end, + State) of + {true, _State} -> reduce_memory_use(State1); + {false, _State} -> State1 + end). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1262,13 +1267,6 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- -confirm_commit_index(State = #vqstate { index_state = IndexState }) -> - case needs_index_sync(State) of - true -> State #vqstate { - index_state = rabbit_queue_index:sync(IndexState) }; - false -> State - end. - record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC, |
