diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-23 12:15:41 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-23 12:15:41 +0100 |
| commit | ced4fd7cf1a74bd0bbbdc18819d632a4aae87be8 (patch) | |
| tree | 60e77591d935515696e53d1f87cf34fa73ad402d /src | |
| parent | abf7da7eb8bc832f594bf030d7ea6c709ad650ff (diff) | |
| download | rabbitmq-server-git-ced4fd7cf1a74bd0bbbdc18819d632a4aae87be8.tar.gz | |
Removed state from fhc. The reasoning is as follows:
1) the fhc uses the process dict
2) the fhc needs to receive msgs from its server process as to limiting the age of unused fhs. This is clearly per process
as such, it's daft to cater for a process having more than one state for the fhc. Thus any state necessary will also be put in the process dict.
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 113 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 194 |
2 files changed, 134 insertions, 173 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fe86044bc1..6cb4c094db 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -31,8 +31,8 @@ -module(file_handle_cache). --export([init/0, open/4, close/2, release/2, read/3, append/3, sync/2, - position/3, truncate/2, last_sync_offset/2]). +-export([open/3, close/1, release/1, read/2, append/2, sync/1, + position/2, truncate/1, last_sync_offset/1]). -record(file, { reader_count, @@ -56,11 +56,9 @@ last_used_at }). -init() -> empty_state. - -open(Path, Mode, Options, State) -> +open(Path, Mode, Options) -> case is_appender(Mode) of - true -> {{error, append_not_supported}, State}; + true -> {error, append_not_supported}; false -> Path1 = filename:absname(Path), case get({Path1, fhc_path}) of @@ -71,7 +69,7 @@ open(Path, Mode, Options, State) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> - {{error, writer_exists}, State}; + {error, writer_exists}; false -> RCount1 = case is_reader(Mode1) of true -> RCount + 1; @@ -82,10 +80,9 @@ open(Path, Mode, Options, State) -> reader_count = RCount1, has_writer = HasWriter orelse IsWriter }), Ref = make_ref(), - case open1(Path1, Mode1, Options, Ref, GRef, State) - of - {{ok, _Handle}, State} -> {{ok, Ref}, State}; - {Error, State} -> {Error, State} + case open1(Path1, Mode1, Options, Ref, GRef) of + {ok, _Handle} -> {ok, Ref}; + Error -> Error end end; undefined -> @@ -94,13 +91,13 @@ open(Path, Mode, Options, State) -> put({GRef, fhc_file}, #file { reader_count = 0, has_writer = false, path = Path1 }), - open(Path, Mode, Options, State) + open(Path, Mode, Options) end end. -close(Ref, State) -> +close(Ref) -> case erase({Ref, fhc_handle}) of - undefined -> {ok, State}; + undefined -> ok; Handle -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, global_key = GRef, @@ -124,21 +121,21 @@ close(Ref, State) -> File #file { reader_count = RCount1, has_writer = HasWriter1 }) end, - {ok, State}; + ok; {Error, Handle1} -> put({Ref, fhc_handle}, Handle1), - {Error, State} + Error end end. -release(_Ref, State) -> %% noop just for now - {ok, State}. +release(_Ref) -> %% noop just for now + ok. -read(Ref, Count, State) -> - case get_or_reopen(Ref, State) of - {{ok, #handle { is_read = false }}, State1} -> - {{error, not_open_for_reading}, State1}; - {{ok, Handle}, State1} -> +read(Ref, Count) -> + case get_or_reopen(Ref) of + {ok, #handle { is_read = false }} -> + {error, not_open_for_reading}; + {ok, Handle} -> {Result, Handle1} = case write_buffer(Handle) of {ok, Handle2 = #handle { hdl = Hdl, offset = Offset }} -> @@ -153,15 +150,15 @@ read(Ref, Count, State) -> {Error, Handle2} -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State1}; + Result; ErrorAndState -> ErrorAndState end. -append(Ref, Data, State) -> - case get_or_reopen(Ref, State) of - {{ok, #handle { is_write = false }}, State1} -> - {{error, not_open_for_writing}, State1}; - {{ok, Handle}, State1} -> +append(Ref, Data) -> + case get_or_reopen(Ref) of + {ok, #handle { is_write = false }} -> + {error, not_open_for_writing}; + {ok, Handle} -> {Result, Handle1} = case maybe_seek(eof, Handle) of {ok, Handle2 = #handle { at_eof = true }} -> @@ -170,54 +167,54 @@ append(Ref, Data, State) -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State1}; + Result; ErrorAndState -> ErrorAndState end. -last_sync_offset(Ref, State) -> - case get_or_reopen(Ref, State) of - {{ok, #handle { trusted_offset = TrustedOffset }}, State1} -> - {{ok, TrustedOffset}, State1}; +last_sync_offset(Ref) -> + case get_or_reopen(Ref) of + {ok, #handle { trusted_offset = TrustedOffset }} -> + {ok, TrustedOffset}; ErrorAndState -> ErrorAndState end. -position(Ref, NewOffset, State) -> - case get_or_reopen(Ref, State) of - {{ok, Handle}, State1} -> +position(Ref, NewOffset) -> + case get_or_reopen(Ref) of + {ok, Handle} -> {Result, Handle1} = case write_buffer(Handle) of {ok, Handle2} -> maybe_seek(NewOffset, Handle2); {Error, Handle2} -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State1}; + Result; ErrorAndState -> ErrorAndState end. -sync(Ref, State) -> - case get_or_reopen(Ref, State) of - {{ok, Handle = #handle { write_buffer = [], hdl = Hdl, - offset = Offset }}, State1} -> +sync(Ref) -> + case get_or_reopen(Ref) of + {ok, Handle = #handle { write_buffer = [], hdl = Hdl, + offset = Offset }} -> {Result, Handle1} = case file:sync(Hdl) of ok -> {ok, Handle #handle { trusted_offset = Offset }}; Error -> {Error, Handle} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State1}; - {{ok, Handle = #handle { at_eof = true }}, State1} -> + Result; + {ok, Handle = #handle { at_eof = true }} -> %% we can't have content in the buffer without being at eof {Result, Handle1} = write_buffer(Handle), put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }), - {Result, State1}; + Result; ErrorAndState -> ErrorAndState end. -truncate(Ref, State) -> - case get_or_reopen(Ref, State) of - {{ok, #handle { is_write = false }}, State1} -> - {{error, not_open_for_writing}, State1}; - {{ok, Handle}, State1} -> +truncate(Ref) -> + case get_or_reopen(Ref) of + {ok, #handle { is_write = false }} -> + {error, not_open_for_writing}; + {ok, Handle} -> {Result, Handle1} = case write_buffer(Handle) of {ok, @@ -236,21 +233,21 @@ truncate(Ref, State) -> {Error, Handle2} -> {Error, Handle2} end, put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now () }), - {Result, State1}; + Result; ErrorAndState -> ErrorAndState end. -get_or_reopen(Ref, State) -> +get_or_reopen(Ref) -> case get({Ref, fhc_handle}) of - undefined -> {{error, not_open}, State}; + undefined -> {error, not_open}; #handle { hdl = closed, mode = Mode, global_key = GRef, options = Options } -> #file { path = Path } = get({GRef, fhc_file}), - open1(Path, Mode, Options, Ref, GRef, State); - Handle -> {{ok, Handle}, State} + open1(Path, Mode, Options, Ref, GRef); + Handle -> {ok, Handle} end. -open1(Path, Mode, Options, Ref, GRef, State) -> +open1(Path, Mode, Options, Ref, GRef) -> case file:open(Path, Mode) of {ok, Hdl} -> WriteBufferSize = @@ -267,9 +264,9 @@ open1(Path, Mode, Options, Ref, GRef, State) -> is_write = is_writer(Mode), is_read = is_reader(Mode), global_key = GRef, last_used_at = now() }, put({Ref, fhc_handle}, Handle), - {{ok, Handle}, State}; + {ok, Handle}; {error, Reason} -> - {{error, Reason}, State} + {error, Reason} end. maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 50f013f89b..57abfa9dc0 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -109,7 +109,6 @@ -record(qistate, { dir, seg_num_handles, - hc_state, journal_ack_count, journal_ack_dict, seg_ack_counts @@ -125,7 +124,6 @@ -type(seq_id() :: integer()). -type(qistate() :: #qistate { dir :: file_path(), seg_num_handles :: dict(), - hc_state :: any(), journal_ack_count :: integer(), journal_ack_dict :: dict(), seg_ack_counts :: dict() @@ -156,13 +154,11 @@ %%---------------------------------------------------------------------------- init(Name) -> - HCState = file_handle_cache:init(), StrName = queue_name_to_dir_name(Name), Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), State = #qistate { dir = Dir, seg_num_handles = dict:new(), - hc_state = HCState, journal_ack_count = 0, journal_ack_dict = dict:new(), seg_ack_counts = dict:new() }, @@ -185,38 +181,31 @@ write_published(MsgId, SeqId, IsPersistent, State) ?MSG_ID_BYTES = size(MsgId), {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, HCState} = - file_handle_cache:append(Hdl, - <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, MsgId/binary>>, - State1 #qistate.hc_state), - State1 #qistate { hc_state = HCState }. + ok = file_handle_cache:append(Hdl, + <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, MsgId/binary>>), + State1. write_delivered(SeqId, State) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, HCState} = file_handle_cache:append( - Hdl, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, - State1 #qistate.hc_state), - State1 #qistate { hc_state = HCState }. + ok = file_handle_cache:append( + Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>), + State1. write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict, journal_ack_count = JAckCount }) -> {Hdl, State1} = get_journal_handle(State), - {JAckDict1, JAckCount1, HCState} = + {JAckDict1, JAckCount1} = lists:foldl( - fun (SeqId, {JAckDict2, JAckCount2, HCStateN}) -> - {ok, HCStateM} = file_handle_cache:append( - Hdl, <<SeqId:?SEQ_BITS>>, HCStateN), - {add_ack_to_ack_dict(SeqId, JAckDict2), - JAckCount2 + 1, HCStateM} - end, {JAckDict, JAckCount, State1 #qistate.hc_state}, SeqIds), + fun (SeqId, {JAckDict2, JAckCount2}) -> + ok = file_handle_cache:append(Hdl, <<SeqId:?SEQ_BITS>>), + {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1} + end, {JAckDict, JAckCount}, SeqIds), State2 = State1 #qistate { journal_ack_dict = JAckDict1, - journal_ack_count = JAckCount1, - hc_state = HCState }, + journal_ack_count = JAckCount1 }, case JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT of true -> full_flush_journal(State2); false -> State2 @@ -228,14 +217,11 @@ full_flush_journal(State) -> {false, State1} -> State1 end. -sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) -> - HCState1 = - dict:fold( - fun (_Key, Hdl, HCStateN) -> - {ok, HCStateM} = file_handle_cache:sync(Hdl, HCStateN), - HCStateM - end, HCState, SegHdls), - State #qistate { hc_state = HCState1 }. +sync_all(State = #qistate { seg_num_handles = SegHdls }) -> + ok = dict:fold(fun (_Key, Hdl, ok) -> + file_handle_cache:sync(Hdl) + end, ok, SegHdls), + State. flush_journal(State = #qistate { journal_ack_count = 0 }) -> {false, State}; @@ -249,11 +235,10 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict, journal_ack_count = JAckCount1 }, if JAckCount1 == 0 -> - {Hdl, State3 = #qistate { hc_state = HCState }} = - get_journal_handle(State2), - {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState), - {ok, HCState2} = file_handle_cache:truncate(Hdl, HCState1), - {false, State3 #qistate { hc_state = HCState2 }}; + {Hdl, State3} = get_journal_handle(State2), + ok = file_handle_cache:position(Hdl, bof), + ok = file_handle_cache:truncate(Hdl), + {false, State3}; JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> flush_journal(State2); true -> @@ -381,36 +366,27 @@ get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls } end. new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) -> - State1 = #qistate { hc_state = HCState, - seg_num_handles = SegHdls1 } = + State1 = #qistate { seg_num_handles = SegHdls1 } = case dict:size(SegHdls) > 100 of true -> close_all_handles(State); false -> State end, - {{ok, Hdl}, HCState1} = - file_handle_cache:open(Path, Mode, [{write_buffer, infinity}], HCState), - {Hdl, State1 #qistate { hc_state = HCState1, - seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}. + {ok, Hdl} = file_handle_cache:open(Path, Mode, [{write_buffer, infinity}]), + {Hdl, State1 #qistate { seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}. -close_handle(Key, State = #qistate { hc_state = HCState, - seg_num_handles = SegHdls }) -> +close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) -> case dict:find(Key, SegHdls) of {ok, Hdl} -> - {ok, HCState1} = file_handle_cache:close(Hdl, HCState), - State #qistate { hc_state = HCState1, - seg_num_handles = dict:erase(Key, SegHdls) }; + ok = file_handle_cache:close(Hdl), + State #qistate { seg_num_handles = dict:erase(Key, SegHdls) }; error -> State end. -close_all_handles(State = #qistate { hc_state = HCState, - seg_num_handles = SegHdls }) -> - HCState1 = - dict:fold( - fun (_Key, Hdl, HCStateN) -> - {ok, HCStateM} = file_handle_cache:close(Hdl, HCStateN), - HCStateM - end, HCState, SegHdls), - State #qistate { hc_state = HCState1, seg_num_handles = dict:new() }. +close_all_handles(State = #qistate { seg_num_handles = SegHdls }) -> + ok = dict:fold(fun (_Key, Hdl, ok) -> + file_handle_cache:close(Hdl) + end, ok, SegHdls), + State #qistate { seg_num_handles = dict:new() }. bool_to_int(true ) -> 1; bool_to_int(false) -> 0. @@ -507,24 +483,23 @@ find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) -> scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME), - {Hdl, State1 = #qistate { hc_state = HCState, - journal_ack_dict = JAckDict }} = + {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} = get_journal_handle(State), %% ADict may well contain duplicates. However, this is ok, due to %% the use of sets in replay_journal_acks_to_segment - {ADict, HCState1} = load_journal(Hdl, JAckDict, HCState), - State2 = close_handle(journal, State1 #qistate { hc_state = HCState1 }), + ADict = load_journal(Hdl, JAckDict), + State2 = close_handle(journal, State1), {TotalMsgCount1, State3} = dict:fold(fun replay_journal_acks_to_segment/3, {TotalMsgCount, State2}, ADict), ok = file:delete(JournalPath), {TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}. -load_journal(Hdl, ADict, HCState) -> - case file_handle_cache:read(Hdl, ?SEQ_BYTES, HCState) of - {{ok, <<SeqId:?SEQ_BITS>>}, HCState1} -> - load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1); - {_ErrOrEoF, HCState1} -> {ADict, HCState1} +load_journal(Hdl, ADict) -> + case file_handle_cache:read(Hdl, ?SEQ_BYTES) of + {ok, <<SeqId:?SEQ_BITS>>} -> + load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict)); + _ErrOrEoF -> ADict end. replay_journal_acks_to_segment(_, [], Acc) -> @@ -554,17 +529,14 @@ deliver_transient(SegNum, SDict, State) -> {[RelSeq | AckMeAcc], DeliverMeAcc} end, {[], []}, SDict), {Hdl, State1} = get_seg_handle(SegNum, State), - {ok, HCState} = - case DeliverMe of - [] -> {ok, State1 #qistate.hc_state}; - _ -> - file_handle_cache:append( - Hdl, - [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ], - State1 #qistate.hc_state) - end, - {AckMe, State1 #qistate { hc_state = HCState }}. + ok = case DeliverMe of + [] -> ok; + _ -> file_handle_cache:append( + Hdl, + [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]) + end, + {AckMe, State1}. %%---------------------------------------------------------------------------- @@ -580,12 +552,11 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, case SegmentExists of false -> {dict:new(), 0, 0, State}; true -> - {Hdl, State1 = #qistate { hc_state = HCState, - journal_ack_dict = JAckDict }} = + {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} = get_seg_handle(SegNum, State), - {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState), - {SDict, AckCount, HighRelSeq, HCState2} = - load_segment_entries(Hdl, dict:new(), 0, 0, HCState1), + ok = file_handle_cache:position(Hdl, bof), + {SDict, AckCount, HighRelSeq} = + load_segment_entries(Hdl, dict:new(), 0, 0), RelSeqs = case dict:find(SegNum, JAckDict) of {ok, RelSeqs1} -> RelSeqs1; error -> [] @@ -594,35 +565,31 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> {dict:erase(RelSeq, SDict2), AckCount2 + 1} end, {SDict, AckCount}, RelSeqs), - {SDict1, AckCount1, HighRelSeq, - State1 #qistate { hc_state = HCState2 }} + {SDict1, AckCount1, HighRelSeq, State1} end. -load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) -> - case file_handle_cache:read(Hdl, 1, HCState) of - {{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} -> - {{ok, LSB}, HCState2} = - file_handle_cache:read( - Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1), +load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) -> + case file_handle_cache:read(Hdl, 1) of + {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> + {ok, LSB} = file_handle_cache:read( + Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), - load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq, HCState2); - {{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>}, HCState1} -> + load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq); + {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} = + {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = file_handle_cache:read( - Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1), + Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), - load_segment_entries( - Hdl, dict:store(RelSeq, {MsgId, false, - 1 == IsPersistentNum}, - SDict), AckCount, HighRelSeq1, HCState2); - {_ErrOrEoF, HCState1} -> - {SDict, AckCount, HighRelSeq, HCState1} + load_segment_entries(Hdl, dict:store(RelSeq, {MsgId, false, + 1 == IsPersistentNum}, + SDict), AckCount, HighRelSeq1); + _ErrOrEoF -> {SDict, AckCount, HighRelSeq} end. deliver_or_ack_msg(SDict, AckCount, RelSeq) -> @@ -665,15 +632,12 @@ append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir }) append_acks_to_segment(SegNum, AckCount, Acks, State) when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT -> {Hdl, State1} = get_seg_handle(SegNum, State), - {AckCount1, HCState} = + {ok, AckCount1} = lists:foldl( - fun (RelSeq, {AckCount2, HCStateN}) -> - {ok, HCStateM} = - file_handle_cache:append( - Hdl, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, HCStateN), - {AckCount2 + 1, HCStateM} - end, {AckCount, State1 #qistate.hc_state}, Acks), - {ok, HCState1} = file_handle_cache:sync(Hdl, HCState), - {AckCount1, State1 #qistate { hc_state = HCState1 }}. + fun (RelSeq, {ok, AckCount2}) -> + {file_handle_cache:append( + Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>), AckCount2 + 1} + end, {ok, AckCount}, Acks), + ok = file_handle_cache:sync(Hdl), + {AckCount1, State1}. |
