summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-23 13:38:56 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-23 13:38:56 +0100
commit21cca2413fcf784f44058c207b59bbc92e519856 (patch)
tree9732442adea891f2978548e4df2c23bcf6f2fa13 /src
parent882ca0d5945b0c092e88aa6d9a2f5760fc1f455b (diff)
downloadrabbitmq-server-git-21cca2413fcf784f44058c207b59bbc92e519856.tar.gz
finally added support for smart syncing of the queue index on commit. Note performance is pretty bad as although the msg_store is doing coalescing, the queue index isn't yet, so it's fractionally above one call to file:sync per commit
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl22
-rw-r--r--src/rabbit_variable_queue.erl9
2 files changed, 20 insertions, 11 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 57abfa9dc0..7259eaa23f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, flush_journal/1, sync_all/1,
+ write_delivered/2, write_acks/2, flush_journal/1, sync_seq_ids/2,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -217,11 +217,21 @@ full_flush_journal(State) ->
{false, State1} -> State1
end.
-sync_all(State = #qistate { seg_num_handles = SegHdls }) ->
- ok = dict:fold(fun (_Key, Hdl, ok) ->
- file_handle_cache:sync(Hdl)
- end, ok, SegHdls),
- State.
+sync_seq_ids(SeqIds, State) ->
+ {Hdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:sync(Hdl),
+ SegNumsSet =
+ lists:foldl(
+ fun (SeqId, Set) ->
+ {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ sets:add_element(SegNum, Set)
+ end, sets:new(), SeqIds),
+ sets:fold(
+ fun (SegNum, StateN) ->
+ {Hdl1, StateM} = get_seg_handle(SegNum, StateN),
+ ok = file_handle_cache:sync(Hdl1),
+ StateM
+ end, State1, SegNumsSet).
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
{false, State};
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7a85b30272..4dce1f6a64 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -379,15 +379,14 @@ tx_commit(Pubs, AckTags, From, State) ->
end.
do_tx_commit(Pubs, AckTags, From, State) ->
- {_PubSeqIds, State1} =
+ State1 = ack(AckTags, State),
+ {PubSeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg, {SeqIdsAcc, StateN}) ->
{SeqId, StateN1} = publish(Msg, false, true, StateN),
{[SeqId | SeqIdsAcc], StateN1}
- end, {[], State}, Pubs),
- %% TODO need to do something here about syncing the queue index, PubSeqIds
- State2 = #vqstate { index_state = IndexState } = ack(AckTags, State1),
- IndexState1 = rabbit_queue_index:sync_all(IndexState),
+ end, {[], State1}, Pubs),
+ IndexState1 = rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState),
gen_server2:reply(From, ok),
State2 #vqstate { index_state = IndexState1 }.