diff options
| -rw-r--r-- | src/file_handle_cache.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 110 |
2 files changed, 106 insertions, 30 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 2a3f1ded74..8e0849022e 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -119,7 +119,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, - append_write_buffer/1, copy/3, set_maximum_since_use/1]). + append_write_buffer/1, copy/3, set_maximum_since_use/1, delete/1, + discard_write_buffer/1]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -193,6 +194,8 @@ -spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> ({'ok', integer()} | error())). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). +-spec(delete/1 :: (ref()) -> ok_or_error()). +-spec(discard_write_buffer/1 :: (ref()) -> ok_or_error()). -endif. @@ -361,6 +364,27 @@ copy(Src, Dest, Count) -> {error, incorrect_handle_modes} end). +delete(Ref) -> + case erase({Ref, fhc_handle}) of + undefined -> ok; + Handle = #handle { path = Path } -> + Handle1 = Handle #handle { is_dirty = false, write_buffer = [] }, + case close1(Ref, Handle1, hard) of + ok -> file:delete(Path); + Error -> Error + end + end. + +discard_write_buffer(Ref) -> + with_handles( + [Ref], + fun ([#handle { write_buffer = [] }]) -> + ok; + ([Handle = #handle { write_buffer_size = Size, offset = Offset }]) -> + {ok, [Handle #handle { write_buffer = [], write_buffer_size = 0, + offset = Offset - Size }]} + end). + set_maximum_since_use(MaximumAge) -> Now = now(), case lists:foldl( diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index acebc32d6c..bbd9508672 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -87,7 +87,9 @@ { dir, segments, journal_handle, - dirty_count + dirty_count, + last_seg_a, + last_seg_b }). -record(segment, @@ -238,8 +240,9 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> flush_journal(State = #qistate { dirty_count = 0 }) -> State; -flush_journal(State = #qistate { segments = Segments }) -> - State1 = +flush_journal(State) -> + State1 = #qistate { segments = Segments } = get_all_segments(State), + State2 = dict:fold( fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, acks = AckCount } = Segment, StateN) -> @@ -261,12 +264,13 @@ flush_journal(State = #qistate { segments = Segments }) -> dict:new() }, StateN) end end - end, State #qistate { segments = dict:new() }, Segments), - {JournalHdl, State2} = get_journal_handle(State1), + end, State1 #qistate { segments = dict:new() }, Segments), + {JournalHdl, State3} = get_journal_handle(State2), + ok = file_handle_cache:discard_write_buffer(JournalHdl), {ok, 0} = file_handle_cache:position(JournalHdl, bof), ok = file_handle_cache:truncate(JournalHdl), ok = file_handle_cache:sync(JournalHdl), - State2 #qistate { dirty_count = 0 }. + State3 #qistate { dirty_count = 0 }. read_segment_entries(InitSeqId, State) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), @@ -384,7 +388,8 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount }) maybe_flush_journal(State) -> State. -all_segment_nums(#qistate { segments = Segments, dir = Dir }) -> +all_segment_nums(State = #qistate { dir = Dir }) -> + #qistate { segments = Segments } = get_all_segments(State), sets:to_list( lists:foldl( fun (SegName, Set) -> @@ -402,7 +407,9 @@ blank_state(QueueName) -> #qistate { dir = Dir, segments = dict:new(), journal_handle = undefined, - dirty_count = 0 + dirty_count = 0, + last_seg_a = undefined, + last_seg_b = undefined }. rev_sort(List) -> @@ -420,9 +427,8 @@ seg_num_to_path(Dir, Seg) -> delete_segment(#segment { handle = undefined }) -> ok; -delete_segment(#segment { handle = Hdl, path = Path }) -> - ok = file_handle_cache:close(Hdl), - ok = file:delete(Path), +delete_segment(#segment { handle = Hdl }) -> + ok = file_handle_cache:delete(Hdl), ok. detect_clean_shutdown(Dir) -> @@ -462,6 +468,10 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> get_segment_handle(Segment = #segment { handle = Hdl }) -> {Hdl, Segment}. +find_segment(Seg, #qistate { last_seg_a = #segment { num = Seg } = Segment }) -> + Segment; +find_segment(Seg, #qistate { last_seg_b = #segment { num = Seg } = Segment }) -> + Segment; find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> case dict:find(Seg, Segments) of {ok, Segment = #segment{}} -> Segment; @@ -474,9 +484,46 @@ find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> } end. -store_segment(Segment = #segment { num = Seg }, - State = #qistate { segments = Segments }) -> - State #qistate { segments = dict:store(Seg, Segment, Segments) }. +store_segment(Segment = #segment { num = Seg }, State = + #qistate { last_seg_a = #segment { num = Seg }}) -> + State #qistate { last_seg_a = Segment }; +store_segment(Segment = #segment { num = Seg }, State = + #qistate { last_seg_b = #segment { num = Seg }}) -> + State #qistate { last_seg_b = Segment }; +store_segment(Segment, State = + #qistate { last_seg_a = LastSegA, last_seg_b = LastSegB }) -> + case LastSegA of + undefined -> + State #qistate { last_seg_a = Segment }; + _ -> + case LastSegB of + undefined -> + State #qistate { last_seg_b = Segment }; + _ -> + State1 = #qistate { segments = Segments } = + State #qistate { last_seg_a = LastSegB, + last_seg_b = Segment }, + State1 #qistate { + segments = return_segment_to_dict(LastSegA, Segments) } + end + end. + +get_all_segments(State = #qistate { last_seg_a = undefined, + last_seg_b = undefined }) -> + State; +get_all_segments(State = #qistate { segments = Segments, + last_seg_a = LastSegA, + last_seg_b = LastSegB }) -> + State #qistate { last_seg_a = undefined, + last_seg_b = undefined, + segments = return_segment_to_dict( + LastSegB, + return_segment_to_dict(LastSegA, Segments)) }. + +return_segment_to_dict(undefined, Segments) -> + Segments; +return_segment_to_dict(Segment = #segment { num = Seg }, Segments) -> + dict:store(Seg, Segment, Segments). get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> @@ -517,8 +564,9 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> Hdl. terminate(StoreShutdown, State = - #qistate { segments = Segments, journal_handle = JournalHdl, + #qistate { journal_handle = JournalHdl, dir = Dir }) -> + State1 = #qistate { segments = Segments } = get_all_segments(State), ok = case JournalHdl of undefined -> ok; _ -> file_handle_cache:close(JournalHdl) @@ -533,7 +581,7 @@ terminate(StoreShutdown, State = true -> store_clean_shutdown(Dir); false -> ok end, - State #qistate { journal_handle = undefined, segments = dict:new() }. + State1 #qistate { journal_handle = undefined, segments = dict:new() }. %%---------------------------------------------------------------------------- %% Majors @@ -608,7 +656,8 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> load_journal(State) -> {JournalHdl, State1} = get_journal_handle(State), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State2 = #qistate { segments = Segments } = load_journal_entries(State1), + State2 = #qistate { segments = Segments } = + get_all_segments(load_journal_entries(State1)), dict:fold( fun (Seg, #segment { journal_entries = JEntries, pubs = PubCountInJournal, @@ -681,18 +730,21 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> %% pub. Also, always want to keep acks. Things must occur in the right %% order though. add_to_journal(RelSeq, Action, SegJDict) -> - case dict:find(RelSeq, SegJDict) of - {ok, {PubRecord, no_del, no_ack}} when Action == del -> - dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); - {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> - dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); - error when Action == del -> - dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); - error when Action == ack -> - dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); - error -> - {_MsgId, _IsPersistent} = Action, %% ASSERTION - dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) + case dict:is_key(RelSeq, SegJDict) of + true -> + dict:update(RelSeq, + fun ({PubRecord, no_del, no_ack}) when Action == del -> + {PubRecord, del, no_ack}; + ({PubRecord, Del, no_ack}) when Action == ack -> + {PubRecord, Del, ack} + end, SegJDict); + false -> + dict:store(RelSeq, + case Action of + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack}; + {_Msg, _IsPersistent} -> {Action, no_del, no_ack} + end, SegJDict) end. %% Combine what we have just read from a segment file with what we're |
