diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-23 13:38:56 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-23 13:38:56 +0100 |
| commit | 21cca2413fcf784f44058c207b59bbc92e519856 (patch) | |
| tree | 9732442adea891f2978548e4df2c23bcf6f2fa13 /src | |
| parent | 882ca0d5945b0c092e88aa6d9a2f5760fc1f455b (diff) | |
| download | rabbitmq-server-git-21cca2413fcf784f44058c207b59bbc92e519856.tar.gz | |
finally added support for smart syncing of the queue index on commit. Note performance is pretty bad as although the msg_store is doing coalescing, the queue index isn't yet, so it's fractionally above one call to file:sync per commit
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 9 |
2 files changed, 20 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 57abfa9dc0..7259eaa23f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, flush_journal/1, sync_all/1, + write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/2, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). @@ -217,11 +217,21 @@ full_flush_journal(State) -> {false, State1} -> State1 end. -sync_all(State = #qistate { seg_num_handles = SegHdls }) -> - ok = dict:fold(fun (_Key, Hdl, ok) -> - file_handle_cache:sync(Hdl) - end, ok, SegHdls), - State. +sync_seq_ids(SeqIds, State) -> + {Hdl, State1} = get_journal_handle(State), + ok = file_handle_cache:sync(Hdl), + SegNumsSet = + lists:foldl( + fun (SeqId, Set) -> + {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + sets:add_element(SegNum, Set) + end, sets:new(), SeqIds), + sets:fold( + fun (SegNum, StateN) -> + {Hdl1, StateM} = get_seg_handle(SegNum, StateN), + ok = file_handle_cache:sync(Hdl1), + StateM + end, State1, SegNumsSet). flush_journal(State = #qistate { journal_ack_count = 0 }) -> {false, State}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7a85b30272..4dce1f6a64 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -379,15 +379,14 @@ tx_commit(Pubs, AckTags, From, State) -> end. do_tx_commit(Pubs, AckTags, From, State) -> - {_PubSeqIds, State1} = + State1 = ack(AckTags, State), + {PubSeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg, {SeqIdsAcc, StateN}) -> {SeqId, StateN1} = publish(Msg, false, true, StateN), {[SeqId | SeqIdsAcc], StateN1} - end, {[], State}, Pubs), - %% TODO need to do something here about syncing the queue index, PubSeqIds - State2 = #vqstate { index_state = IndexState } = ack(AckTags, State1), - IndexState1 = rabbit_queue_index:sync_all(IndexState), + end, {[], State1}, Pubs), + IndexState1 = rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState), gen_server2:reply(From, ok), State2 #vqstate { index_state = IndexState1 }. |
