diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-02-29 15:07:50 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-02-29 15:07:50 +0000 |
| commit | b50cd3e80dd22af36ed209a7ece12290a9a8cb31 (patch) | |
| tree | 91349ecd36e44dbb18bc08adfad6c42ae8811ffb /src | |
| parent | 4435c748294379add22aca9e3ee7273113daee18 (diff) | |
| parent | 5771884a66b9b8343b8ab11217a7b6049f321225 (diff) | |
| download | rabbitmq-server-git-b50cd3e80dd22af36ed209a7ece12290a9a8cb31.tar.gz | |
Merge bug24558
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 43 |
4 files changed, 50 insertions, 53 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 59a0ab1cc0..f3b4dbafa2 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -143,9 +143,9 @@ -behaviour(gen_server2). -export([register_callback/3]). --export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, - set_maximum_since_use/1, delete/1, clear/1]). +-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, + truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, + copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -373,6 +373,13 @@ sync(Ref) -> end end). +needs_sync(Ref) -> + with_handles( + [Ref], + fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false; + ([_Handle]) -> true + end). + position(Ref, NewOffset) -> with_flushed_handles( [Ref], diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fd2d7214d2..9494367764 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -279,9 +279,9 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> confirm_messages(MsgIds, State#q{ backing_queue_state = BQS1}))), case BQ:needs_timeout(BQS1) of - false -> {stop_sync_timer(State1), hibernate}; - idle -> {stop_sync_timer(State1), 0 }; - timed -> {ensure_sync_timer(State1), 0 } + false -> {stop_sync_timer(State1), hibernate }; + idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; + timed -> {ensure_sync_timer(State1), 0 } end. backing_queue_module(#amqqueue{arguments = Args}) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4c8793f1ad..3d07e8b055 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -18,8 +18,8 @@ -export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, - publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3, - next_segment_boundary/1, bounds/1, recover/1]). + publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, + read/3, next_segment_boundary/1, bounds/1, recover/1]). -export([add_queue_ttl/0]). @@ -207,7 +207,8 @@ -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). --spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). +-spec(sync/1 :: (qistate()) -> qistate()). +-spec(needs_sync/1 :: (qistate()) -> boolean()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_types:msg_id(), seq_id(), @@ -283,20 +284,18 @@ deliver(SeqIds, State) -> ack(SeqIds, State) -> deliver_or_ack(ack, SeqIds, State). -%% This is only called when there are outstanding confirms and the -%% queue is idle. -sync(State = #qistate { unsynced_msg_ids = MsgIds }) -> - sync_if(not gb_sets:is_empty(MsgIds), State). - -sync(SeqIds, State) -> - %% The SeqIds here contains the SeqId of every publish and ack to - %% be sync'ed. Ideally we should go through these seqids and only - %% sync the journal if the pubs or acks appear in the - %% journal. However, this would be complex to do, and given that - %% the variable queue publishes and acks to the qi, and then - %% syncs, all in one operation, there is no possibility of the - %% seqids not being in the journal. - sync_if([] =/= SeqIds, State). +%% This is called when there are outstanding confirms or when the +%% queue is idle and the journal needs syncing (see needs_sync/1). +sync(State = #qistate { journal_handle = undefined }) -> + State; +sync(State = #qistate { journal_handle = JournalHdl }) -> + ok = file_handle_cache:sync(JournalHdl), + notify_sync(State). + +needs_sync(#qistate { journal_handle = undefined }) -> + false; +needs_sync(#qistate { journal_handle = JournalHdl }) -> + file_handle_cache:needs_sync(JournalHdl). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -707,14 +706,6 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -sync_if(false, State) -> - State; -sync_if(_Bool, State = #qistate { journal_handle = undefined }) -> - State; -sync_if(true, State = #qistate { journal_handle = JournalHdl }) -> - ok = file_handle_cache:sync(JournalHdl), - notify_sync(State). - notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> OnSyncFun(UG), State #qistate { unsynced_msg_ids = gb_sets:new() }. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1b32d21197..46f6d6c1f9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -750,21 +750,27 @@ ram_duration(State = #vqstate { ram_msg_count_prev = RamMsgCount, ram_ack_count_prev = RamAckCount }}. -needs_timeout(State) -> - case needs_index_sync(State) of - false -> case reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end; - true -> timed +needs_timeout(State = #vqstate { index_state = IndexState }) -> + case must_sync_index(State) of + true -> timed; + false -> + case rabbit_queue_index:needs_sync(IndexState) of + true -> idle; + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end + end end. -timeout(State) -> - a(reduce_memory_use(confirm_commit_index(State))). +timeout(State = #vqstate { index_state = IndexState }) -> + IndexState1 = rabbit_queue_index:sync(IndexState), + State1 = State #vqstate { index_state = IndexState1 }, + a(reduce_memory_use(State1)). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. @@ -1281,13 +1287,6 @@ find_persistent_count(LensByStore) -> %% Internal plumbing for confirms (aka publisher acks) %%---------------------------------------------------------------------------- -confirm_commit_index(State = #vqstate { index_state = IndexState }) -> - case needs_index_sync(State) of - true -> State #vqstate { - index_state = rabbit_queue_index:sync(IndexState) }; - false -> State - end. - record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC, @@ -1297,8 +1296,8 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = gb_sets:difference(UC, MsgIdSet), confirmed = gb_sets:union (C, MsgIdSet) }. -needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> +must_sync_index(#vqstate { msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> %% If UC is empty then by definition, MIOD and MOD are also empty %% and there's nothing that can be pending a sync. |
