diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-22 16:23:55 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-22 16:23:55 +0100 |
| commit | 7d78971a0eef645b34905cd681e48012ce7cf07d (patch) | |
| tree | 2dfecec671a3c4aa05fde0f1d7ec727a74ad1237 | |
| parent | 966a760267e30aa0d0c3a5b2a841fa9350238689 (diff) | |
| download | rabbitmq-server-git-7d78971a0eef645b34905cd681e48012ce7cf07d.tar.gz | |
bug fix in fhc:read (not tracking change in offset). Also made qi use new fhc, and remove hdfhc
| -rw-r--r-- | src/file_handle_cache.erl | 59 | ||||
| -rw-r--r-- | src/horrendously_dumb_file_handle_cache.erl | 246 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 78 |
3 files changed, 91 insertions, 292 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e86344a087..33a69ed78f 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -134,12 +134,17 @@ read(Ref, NewOffset, Count, State) -> case write_buffer(Handle) of {ok, Handle2} -> case maybe_seek(NewOffset, Handle2) of - {ok, Handle3 = #handle { hdl = Hdl }} -> + {ok, Handle3 = #handle { hdl = Hdl, + offset = Offset }} -> case file:read(Hdl, Count) of - {ok, _} = Obj -> {Obj, Handle3}; - eof -> {eof, - Handle3 #handle { at_eof = true }}; - {error, _} = Error -> {Error, Handle3} + {ok, _} = Obj -> + {Obj, Handle3 #handle { + offset = Offset + Count }}; + eof -> + {eof, Handle3 #handle { + at_eof = true }}; + {error, _} = Error -> + {Error, Handle3} end; {Error, Handle3} -> {Error, Handle3} end; @@ -185,6 +190,50 @@ position(Ref, NewOffset, State) -> {Result, State} end. +sync(Ref, State) -> + case get({Ref, fhc_handle}) of + undefined -> {{error, not_open}, State}; + Handle = #handle { write_buffer = [], hdl = Hdl, offset = Offset } -> + {Result, Handle1} = + case file:sync(Hdl) of + ok -> {ok, Handle #handle { trusted_offset = Offset }}; + Error -> {Error, Handle} + end, + put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), + {Result, State}; + Handle = #handle { at_eof = true } -> + %% we can't have content in the buffer without being at eof + {Result, Handle1} = write_buffer(Handle), + put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), + {Result, State} + end. + +truncate(Ref, State) -> + case get({Ref, fhc_handle}) of + undefined -> {{error, not_open}, State}; + Handle = #handle { is_write = true } -> + {Result, Handle1} = + case write_buffer(Handle) of + {ok, + Handle2 = #handle { hdl = Hdl, offset = Offset, + trusted_offset = TrustedOffset }} -> + case file:truncate(Hdl) of + ok -> + {ok, + Handle2 #handle { + at_eof = true, + trusted_offset = lists:min([Offset, + TrustedOffset]) + }}; + Error -> {Error, Handle2} + end; + {Error, Handle2} -> {Error, Handle2} + end, + put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now () }), + {Result, State}; + _Handle -> {{error, not_open_for_writing}, State} + end. + open1(Path, Mode, Options, GRef, State) -> case file:open(Path, Mode) of {ok, Hdl} -> diff --git a/src/horrendously_dumb_file_handle_cache.erl b/src/horrendously_dumb_file_handle_cache.erl deleted file mode 100644 index e3aa49b3ca..0000000000 --- a/src/horrendously_dumb_file_handle_cache.erl +++ /dev/null @@ -1,246 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(horrendously_dumb_file_handle_cache). - --export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2, - position/3, truncate/2, with_file_handle_at/4, sync_to_offset/3]). - --record(entry, - { hdl, - current_offset, - last_sync_offset, - write_buffer, - is_append, - at_eof, - path_mode_key }). - -init() -> empty_state. - -open(Path, Mode, [] = _ExtraOptions, State) -> - Mode1 = lists:usort(Mode), - Path1 = filename:absname(Path), - Key = {Path1, Mode1}, - case get({fhc, path_mode_ref, Key}) of - {ref, Ref} -> {{ok, Ref}, State}; - undefined -> - case file:open(Path1, Mode1) of - {ok, Hdl} -> - Ref = make_ref(), - put({fhc, path_mode_ref, Key}, {ref, Ref}), - Entry = #entry { hdl = Hdl, current_offset = 0, - last_sync_offset = 0, write_buffer = [], - is_append = lists:member(append, Mode1), - at_eof = false, path_mode_key = Key }, - put({fhc, ref_entry, Ref}, Entry), - {{ok, Ref}, State}; - {error, Error} -> - {{error, Error}, State} - end - end. - -close(Ref, State) -> - {ok, - case erase({fhc, ref_entry, Ref}) of - #entry { hdl = Hdl, write_buffer = WriteBuffer, path_mode_key = Key } -> - ok = case WriteBuffer of - [] -> ok; - _ -> ok = file:write(Hdl, lists:reverse(WriteBuffer)), - file:sync(Hdl) - end, - ok = file:close(Hdl), - erase({fhc, path_mode_ref, Key}), - State; - undefined -> State - end}. - -release(_Ref, State) -> %% noop for the time being - {ok, State}. - -read(Ref, Offset, Count, State) -> - case get({fhc, ref_entry, Ref}) of - Entry = #entry { hdl = Hdl, current_offset = OldOffset, - write_buffer = WriteBuffer } -> - ok = case WriteBuffer of - [] -> ok; - _ -> file:write(Hdl, lists:reverse(WriteBuffer)) - end, - NewOffset = Count + - case Offset of - cur -> OldOffset; - _ -> {ok, RealOff} = file:position(Hdl, Offset), - RealOff - end, - put({fhc, ref_entry, Ref}, - Entry #entry { current_offset = NewOffset, - at_eof = Offset =:= eof, - write_buffer = [] }), - {file:read(Hdl, Count), State}; - undefined -> {{error, not_open}, State} - end. - -%% if the file was opened in append mode, then Offset is ignored, as -%% it would only affect the read head for this file. -write(Ref, Offset, Data, State) -> - case get({fhc, ref_entry, Ref}) of - Entry = #entry { hdl = Hdl, current_offset = OldOffset, - is_append = IsAppend, at_eof = AtEoF, - write_buffer = WriteBuffer } -> - NewOffset = - case IsAppend of - true -> - OldOffset; - false -> - size_of_write_data(Data) + - case Offset of - cur -> OldOffset; - eof when AtEoF -> OldOffset; - _ -> {ok, RealOff} = file:position(Hdl, Offset), - RealOff - end - end, - WriteBuffer1 = [Data | WriteBuffer], - put({fhc, ref_entry, Ref}, - Entry #entry { current_offset = NewOffset, - at_eof = Offset =:= eof, - write_buffer = WriteBuffer1 }), - {ok, State}; - undefined -> {{error, not_open}, State} - end. - -sync(Ref, State) -> - case get({fhc, ref_entry, Ref}) of - #entry { write_buffer = [] } -> {ok, State}; - Entry = #entry { hdl = Hdl, current_offset = Offset, - last_sync_offset = LastSyncOffset, - write_buffer = WriteBuffer } -> - SyncOffset = lists:max([Offset, LastSyncOffset]), - ok = file:write(Hdl, lists:reverse(WriteBuffer)), - ok = file:sync(Hdl), - put({fhc, ref_entry, Ref}, - Entry #entry { last_sync_offset = SyncOffset, - write_buffer = [] }), - {ok, State}; - undefined -> {{error, not_open}, State} - end. - -position(Ref, NewOffset, State) -> - case get({fhc, ref_entry, Ref}) of - #entry { current_offset = NewOffset } -> - {ok, State}; - #entry { at_eof = true } when NewOffset =:= eof -> - {ok, State}; - Entry = #entry { hdl = Hdl, write_buffer = WriteBuffer } -> - ok = case WriteBuffer of - [] -> ok; - _ -> file:write(Hdl, lists:reverse(WriteBuffer)) - end, - {ok, RealOff} = file:position(Hdl, NewOffset), - put({fhc, ref_entry, Ref}, - Entry #entry { current_offset = RealOff, - write_buffer = [], - at_eof = NewOffset =:= eof }), - {ok, State}; - undefined -> - {{error, not_open}, State} - end. - -truncate(Ref, State) -> - case get({fhc, ref_entry, Ref}) of - Entry = #entry { hdl = Hdl, write_buffer = WriteBuffer } -> - ok = case WriteBuffer of - [] -> ok; - _ -> file:write(Hdl, lists:reverse(WriteBuffer)) - end, - ok = file:truncate(Hdl), - put({fhc, ref_entry, Ref}, - Entry #entry { at_eof = true, write_buffer = [] }), - {ok, State}; - undefined -> {{error, not_open}, State} - end. - -with_file_handle_at(Ref, Offset, Fun, State) -> - case get({fhc, ref_entry, Ref}) of - Entry = #entry { hdl = Hdl, current_offset = OldOffset, - write_buffer = WriteBuffer, at_eof = AtEoF } -> - ok = case WriteBuffer of - [] -> ok; - _ -> file:write(Hdl, lists:reverse(WriteBuffer)) - end, - ok = case Offset of - eof when AtEoF -> ok; - cur -> ok; - OldOffset -> ok; - _ -> {ok, _RealOff} = file:position(Hdl, Offset), - ok - end, - {Offset2, Result} = Fun(Hdl), - put({fhc, ref_entry, Ref}, - Entry #entry { current_offset = Offset2, write_buffer = [], - at_eof = false }), - {Result, State}; - undefined -> {{error, not_open}, State} - end. - -sync_to_offset(Ref, Offset, State) -> - case get({fhc, ref_entry, Ref}) of - Entry = #entry { hdl = Hdl, last_sync_offset = LastSyncOffset, - current_offset = CurOffset, - write_buffer = [_|_] = WriteBuffer } - when (Offset =:= cur andalso CurOffset > LastSyncOffset) - orelse (Offset > LastSyncOffset) -> - ok = case WriteBuffer of - [] -> ok; - _ -> file:write(Hdl, lists:reverse(WriteBuffer)) - end, - ok = file:sync(Hdl), - LastSyncOffset1 = - case Offset of - cur -> lists:max([LastSyncOffset, CurOffset]); - _ -> lists:max([LastSyncOffset, CurOffset, Offset]) - end, - put({fhc, ref_entry, Ref}, - Entry #entry { last_sync_offset = LastSyncOffset1, - write_buffer = [] }), - {ok, State}; - #entry {} -> {ok, State}; - error -> {{error, not_open}, State} - end. - -size_of_write_data(Data) -> - size_of_write_data(Data, 0). - -size_of_write_data([], Acc) -> - Acc; -size_of_write_data([A|B], Acc) -> - size_of_write_data(B, size_of_write_data(A, Acc)); -size_of_write_data(Bin, Acc) when is_binary(Bin) -> - size(Bin) + Acc. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 48da7e3f33..a50d839c87 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -156,7 +156,7 @@ %%---------------------------------------------------------------------------- init(Name) -> - HCState = horrendously_dumb_file_handle_cache:init(), + HCState = file_handle_cache:init(), StrName = queue_name_to_dir_name(Name), Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), @@ -185,19 +185,19 @@ write_published(MsgId, SeqId, IsPersistent, State) ?MSG_ID_BYTES = size(MsgId), {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, HCState} = horrendously_dumb_file_handle_cache:write( - Hdl, eof, - <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, MsgId/binary>>, - State1 #qistate.hc_state), + {ok, HCState} = + file_handle_cache:append(Hdl, + <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, MsgId/binary>>, + State1 #qistate.hc_state), State1 #qistate { hc_state = HCState }. write_delivered(SeqId, State) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, HCState} = horrendously_dumb_file_handle_cache:write( - Hdl, eof, + {ok, HCState} = file_handle_cache:append( + Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, State1 #qistate.hc_state), @@ -209,9 +209,8 @@ write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict, {JAckDict1, JAckCount1, HCState} = lists:foldl( fun (SeqId, {JAckDict2, JAckCount2, HCStateN}) -> - {ok, HCStateM} = - horrendously_dumb_file_handle_cache:write( - Hdl, eof, <<SeqId:?SEQ_BITS>>, HCStateN), + {ok, HCStateM} = file_handle_cache:append( + Hdl, <<SeqId:?SEQ_BITS>>, HCStateN), {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1, HCStateM} end, {JAckDict, JAckCount, State1 #qistate.hc_state}, SeqIds), @@ -233,8 +232,7 @@ sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) -> HCState1 = dict:fold( fun (_Key, Hdl, HCStateN) -> - {ok, HCStateM} = - horrendously_dumb_file_handle_cache:sync(Hdl, HCStateN), + {ok, HCStateM} = file_handle_cache:sync(Hdl, HCStateN), HCStateM end, HCState, SegHdls), State #qistate { hc_state = HCState1 }. @@ -253,10 +251,8 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict, JAckCount1 == 0 -> {Hdl, State3 = #qistate { hc_state = HCState }} = get_journal_handle(State2), - {ok, HCState1} = - horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState), - {ok, HCState2} = - horrendously_dumb_file_handle_cache:truncate(Hdl, HCState1), + {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState), + {ok, HCState2} = file_handle_cache:truncate(Hdl, HCState1), {false, State3 #qistate { hc_state = HCState2 }}; JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> flush_journal(State2); @@ -391,8 +387,8 @@ new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) -> true -> close_all_handles(State); false -> State end, - {{ok, Hdl}, HCState1} = - horrendously_dumb_file_handle_cache:open(Path, Mode, [], HCState), + {{ok, Hdl}, HCState1} = + file_handle_cache:open(Path, Mode, [{write_buffer, infinity}], HCState), {Hdl, State1 #qistate { hc_state = HCState1, seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}. @@ -400,8 +396,7 @@ close_handle(Key, State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) -> case dict:find(Key, SegHdls) of {ok, Hdl} -> - {ok, HCState1} = - horrendously_dumb_file_handle_cache:close(Hdl, HCState), + {ok, HCState1} = file_handle_cache:close(Hdl, HCState), State #qistate { hc_state = HCState1, seg_num_handles = dict:erase(Key, SegHdls) }; error -> State @@ -411,9 +406,8 @@ close_all_handles(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) -> HCState1 = dict:fold( - fun (_Key, Ref, HCStateN) -> - {ok, HCStateM} = - horrendously_dumb_file_handle_cache:close(Ref, HCStateN), + fun (_Key, Hdl, HCStateN) -> + {ok, HCStateM} = file_handle_cache:close(Hdl, HCStateN), HCStateM end, HCState, SegHdls), State #qistate { hc_state = HCState1, seg_num_handles = dict:new() }. @@ -527,8 +521,7 @@ scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> {TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}. load_journal(Hdl, ADict, HCState) -> - case horrendously_dumb_file_handle_cache:read( - Hdl, cur, ?SEQ_BYTES, HCState) of + case file_handle_cache:read(Hdl, cur, ?SEQ_BYTES, HCState) of {{ok, <<SeqId:?SEQ_BITS>>}, HCState1} -> load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1); {_ErrOrEoF, HCState1} -> {ADict, HCState1} @@ -561,11 +554,16 @@ deliver_transient(SegNum, SDict, State) -> {[RelSeq | AckMeAcc], DeliverMeAcc} end, {[], []}, SDict), {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, HCState} = horrendously_dumb_file_handle_cache:write( - Hdl, eof, - [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ], - State1 #qistate.hc_state), + {ok, HCState} = + case DeliverMe of + [] -> {ok, State1 #qistate.hc_state}; + _ -> + file_handle_cache:append( + Hdl, + [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ], + State1 #qistate.hc_state) + end, {AckMe, State1 #qistate { hc_state = HCState }}. @@ -585,9 +583,7 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, {Hdl, State1 = #qistate { hc_state = HCState, journal_ack_dict = JAckDict }} = get_seg_handle(SegNum, State), - {ok, HCState1} = - horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState), - + {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState), {SDict, AckCount, HighRelSeq, HCState2} = load_segment_entries(Hdl, dict:new(), 0, 0, HCState1), RelSeqs = case dict:find(SegNum, JAckDict) of @@ -603,11 +599,11 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, end. load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) -> - case horrendously_dumb_file_handle_cache:read(Hdl, cur, 1, HCState) of + case file_handle_cache:read(Hdl, cur, 1, HCState) of {{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} -> {{ok, LSB}, HCState2} = - horrendously_dumb_file_handle_cache:read( + file_handle_cache:read( Hdl, cur, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), @@ -617,7 +613,7 @@ load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. {{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} = - horrendously_dumb_file_handle_cache:read( + file_handle_cache:read( Hdl, cur, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), @@ -673,11 +669,11 @@ append_acks_to_segment(SegNum, AckCount, Acks, State) lists:foldl( fun (RelSeq, {AckCount2, HCStateN}) -> {ok, HCStateM} = - horrendously_dumb_file_handle_cache:write( - Hdl, eof, + file_handle_cache:append( + Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, HCStateN), {AckCount2 + 1, HCStateM} end, {AckCount, State1 #qistate.hc_state}, Acks), - {ok, HCState1} = horrendously_dumb_file_handle_cache:sync(Hdl, HCState), + {ok, HCState1} = file_handle_cache:sync(Hdl, HCState), {AckCount1, State1 #qistate { hc_state = HCState1 }}. |
