diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-02 22:29:05 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-02 22:29:05 +0000 |
| commit | 19bb74623da7a270ba6bfbc6e8f4a0bbb4f363fd (patch) | |
| tree | 5db2fd5102075c0144be4440478fd5863d7a8575 /src | |
| parent | 13ff649097a4c3f0baa9549e0a32d37d1e326264 (diff) | |
| download | rabbitmq-server-git-19bb74623da7a270ba6bfbc6e8f4a0bbb4f363fd.tar.gz | |
Up to 11kHz persistent on my home machine. Still a little way off the 13kHz that I was getting before, but there were bugs in the previous QI (eg missing syncs) which could well have led to the old version being too fast. Added two functions to fhc: delete/1 which deletes without flushing any data at all. It will refuse to delete if the file isn't open; discard_write_buffer/1 which does what it says. We use the latter after scattering the journal as after we hit each segment, we sync the segment, so at that point there's no need at all to force out the data in the fhc for the journal prior to truncation.
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 110 |
2 files changed, 106 insertions, 30 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 2a3f1ded74..8e0849022e 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -119,7 +119,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, - append_write_buffer/1, copy/3, set_maximum_since_use/1]). + append_write_buffer/1, copy/3, set_maximum_since_use/1, delete/1, + discard_write_buffer/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -193,6 +194,8 @@ -spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> ({'ok', integer()} | error())). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). +-spec(delete/1 :: (ref()) -> ok_or_error()). +-spec(discard_write_buffer/1 :: (ref()) -> ok_or_error()). -endif. @@ -361,6 +364,27 @@ copy(Src, Dest, Count) -> {error, incorrect_handle_modes} end). +delete(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> ok; + Handle = #handle { path = Path } -> + Handle1 = Handle #handle { is_dirty = false, write_buffer = [] }, + case close1(Ref, Handle1, hard) of + ok -> file:delete(Path); + Error -> Error + end + end. + +discard_write_buffer(Ref) -> + with_handles( + [Ref], + fun ([#handle { write_buffer = [] }]) -> + ok; + ([Handle = #handle { write_buffer_size = Size, offset = Offset }]) -> + {ok, [Handle #handle { write_buffer = [], write_buffer_size = 0, + offset = Offset - Size }]} + end). + set_maximum_since_use(MaximumAge) -> Now = now(), case lists:foldl( diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index acebc32d6c..bbd9508672 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -87,7 +87,9 @@ { dir, segments, journal_handle, - dirty_count + dirty_count, + last_seg_a, + last_seg_b }). -record(segment, @@ -238,8 +240,9 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> flush_journal(State = #qistate { dirty_count = 0 }) -> State; -flush_journal(State = #qistate { segments = Segments }) -> - State1 = +flush_journal(State) -> + State1 = #qistate { segments = Segments } = get_all_segments(State), + State2 = dict:fold( fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, acks = AckCount } = Segment, StateN) -> @@ -261,12 +264,13 @@ flush_journal(State = #qistate { segments = Segments }) -> dict:new() }, StateN) end end - end, State #qistate { segments = dict:new() }, Segments), - {JournalHdl, State2} = get_journal_handle(State1), + end, State1 #qistate { segments = dict:new() }, Segments), + {JournalHdl, State3} = get_journal_handle(State2), + ok = file_handle_cache:discard_write_buffer(JournalHdl), {ok, 0} = file_handle_cache:position(JournalHdl, bof), ok = file_handle_cache:truncate(JournalHdl), ok = file_handle_cache:sync(JournalHdl), - State2 #qistate { dirty_count = 0 }. + State3 #qistate { dirty_count = 0 }. read_segment_entries(InitSeqId, State) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), @@ -384,7 +388,8 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount }) maybe_flush_journal(State) -> State. -all_segment_nums(#qistate { segments = Segments, dir = Dir }) -> +all_segment_nums(State = #qistate { dir = Dir }) -> + #qistate { segments = Segments } = get_all_segments(State), sets:to_list( lists:foldl( fun (SegName, Set) -> @@ -402,7 +407,9 @@ blank_state(QueueName) -> #qistate { dir = Dir, segments = dict:new(), journal_handle = undefined, - dirty_count = 0 + dirty_count = 0, + last_seg_a = undefined, + last_seg_b = undefined }. rev_sort(List) -> @@ -420,9 +427,8 @@ seg_num_to_path(Dir, Seg) -> delete_segment(#segment { handle = undefined }) -> ok; -delete_segment(#segment { handle = Hdl, path = Path }) -> - ok = file_handle_cache:close(Hdl), - ok = file:delete(Path), +delete_segment(#segment { handle = Hdl }) -> + ok = file_handle_cache:delete(Hdl), ok. detect_clean_shutdown(Dir) -> @@ -462,6 +468,10 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> get_segment_handle(Segment = #segment { handle = Hdl }) -> {Hdl, Segment}. +find_segment(Seg, #qistate { last_seg_a = #segment { num = Seg } = Segment }) -> + Segment; +find_segment(Seg, #qistate { last_seg_b = #segment { num = Seg } = Segment }) -> + Segment; find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> case dict:find(Seg, Segments) of {ok, Segment = #segment{}} -> Segment; @@ -474,9 +484,46 @@ find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> } end. -store_segment(Segment = #segment { num = Seg }, - State = #qistate { segments = Segments }) -> - State #qistate { segments = dict:store(Seg, Segment, Segments) }. +store_segment(Segment = #segment { num = Seg }, State = + #qistate { last_seg_a = #segment { num = Seg }}) -> + State #qistate { last_seg_a = Segment }; +store_segment(Segment = #segment { num = Seg }, State = + #qistate { last_seg_b = #segment { num = Seg }}) -> + State #qistate { last_seg_b = Segment }; +store_segment(Segment, State = + #qistate { last_seg_a = LastSegA, last_seg_b = LastSegB }) -> + case LastSegA of + undefined -> + State #qistate { last_seg_a = Segment }; + _ -> + case LastSegB of + undefined -> + State #qistate { last_seg_b = Segment }; + _ -> + State1 = #qistate { segments = Segments } = + State #qistate { last_seg_a = LastSegB, + last_seg_b = Segment }, + State1 #qistate { + segments = return_segment_to_dict(LastSegA, Segments) } + end + end. + +get_all_segments(State = #qistate { last_seg_a = undefined, + last_seg_b = undefined }) -> + State; +get_all_segments(State = #qistate { segments = Segments, + last_seg_a = LastSegA, + last_seg_b = LastSegB }) -> + State #qistate { last_seg_a = undefined, + last_seg_b = undefined, + segments = return_segment_to_dict( + LastSegB, + return_segment_to_dict(LastSegA, Segments)) }. + +return_segment_to_dict(undefined, Segments) -> + Segments; +return_segment_to_dict(Segment = #segment { num = Seg }, Segments) -> + dict:store(Seg, Segment, Segments). get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> @@ -517,8 +564,9 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> Hdl. terminate(StoreShutdown, State = - #qistate { segments = Segments, journal_handle = JournalHdl, + #qistate { journal_handle = JournalHdl, dir = Dir }) -> + State1 = #qistate { segments = Segments } = get_all_segments(State), ok = case JournalHdl of undefined -> ok; _ -> file_handle_cache:close(JournalHdl) @@ -533,7 +581,7 @@ terminate(StoreShutdown, State = true -> store_clean_shutdown(Dir); false -> ok end, - State #qistate { journal_handle = undefined, segments = dict:new() }. + State1 #qistate { journal_handle = undefined, segments = dict:new() }. %%---------------------------------------------------------------------------- %% Majors @@ -608,7 +656,8 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> load_journal(State) -> {JournalHdl, State1} = get_journal_handle(State), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State2 = #qistate { segments = Segments } = load_journal_entries(State1), + State2 = #qistate { segments = Segments } = + get_all_segments(load_journal_entries(State1)), dict:fold( fun (Seg, #segment { journal_entries = JEntries, pubs = PubCountInJournal, @@ -681,18 +730,21 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> %% pub. Also, always want to keep acks. Things must occur in the right %% order though. add_to_journal(RelSeq, Action, SegJDict) -> - case dict:find(RelSeq, SegJDict) of - {ok, {PubRecord, no_del, no_ack}} when Action == del -> - dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); - {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> - dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); - error when Action == del -> - dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); - error when Action == ack -> - dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); - error -> - {_MsgId, _IsPersistent} = Action, %% ASSERTION - dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) + case dict:is_key(RelSeq, SegJDict) of + true -> + dict:update(RelSeq, + fun ({PubRecord, no_del, no_ack}) when Action == del -> + {PubRecord, del, no_ack}; + ({PubRecord, Del, no_ack}) when Action == ack -> + {PubRecord, Del, ack} + end, SegJDict); + false -> + dict:store(RelSeq, + case Action of + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack}; + {_Msg, _IsPersistent} -> {Action, no_del, no_ack} + end, SegJDict) end. %% Combine what we have just read from a segment file with what we're |
