summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-23 12:15:41 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-23 12:15:41 +0100
commitced4fd7cf1a74bd0bbbdc18819d632a4aae87be8 (patch)
tree60e77591d935515696e53d1f87cf34fa73ad402d /src
parentabf7da7eb8bc832f594bf030d7ea6c709ad650ff (diff)
downloadrabbitmq-server-git-ced4fd7cf1a74bd0bbbdc18819d632a4aae87be8.tar.gz
Removed state from fhc. The reasoning is as follows:
1) the fhc uses the process dict 2) the fhc needs to receive msgs from its server process as to limiting the age of unused fhs. This is clearly per process as such, it's daft to cater for a process having more than one state for the fhc. Thus any state necessary will also be put in the process dict.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl113
-rw-r--r--src/rabbit_queue_index.erl194
2 files changed, 134 insertions, 173 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index fe86044bc1..6cb4c094db 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -31,8 +31,8 @@
-module(file_handle_cache).
--export([init/0, open/4, close/2, release/2, read/3, append/3, sync/2,
- position/3, truncate/2, last_sync_offset/2]).
+-export([open/3, close/1, release/1, read/2, append/2, sync/1,
+ position/2, truncate/1, last_sync_offset/1]).
-record(file,
{ reader_count,
@@ -56,11 +56,9 @@
last_used_at
}).
-init() -> empty_state.
-
-open(Path, Mode, Options, State) ->
+open(Path, Mode, Options) ->
case is_appender(Mode) of
- true -> {{error, append_not_supported}, State};
+ true -> {error, append_not_supported};
false ->
Path1 = filename:absname(Path),
case get({Path1, fhc_path}) of
@@ -71,7 +69,7 @@ open(Path, Mode, Options, State) ->
IsWriter = is_writer(Mode1),
case IsWriter andalso HasWriter of
true ->
- {{error, writer_exists}, State};
+ {error, writer_exists};
false ->
RCount1 = case is_reader(Mode1) of
true -> RCount + 1;
@@ -82,10 +80,9 @@ open(Path, Mode, Options, State) ->
reader_count = RCount1,
has_writer = HasWriter orelse IsWriter }),
Ref = make_ref(),
- case open1(Path1, Mode1, Options, Ref, GRef, State)
- of
- {{ok, _Handle}, State} -> {{ok, Ref}, State};
- {Error, State} -> {Error, State}
+ case open1(Path1, Mode1, Options, Ref, GRef) of
+ {ok, _Handle} -> {ok, Ref};
+ Error -> Error
end
end;
undefined ->
@@ -94,13 +91,13 @@ open(Path, Mode, Options, State) ->
put({GRef, fhc_file},
#file { reader_count = 0, has_writer = false,
path = Path1 }),
- open(Path, Mode, Options, State)
+ open(Path, Mode, Options)
end
end.
-close(Ref, State) ->
+close(Ref) ->
case erase({Ref, fhc_handle}) of
- undefined -> {ok, State};
+ undefined -> ok;
Handle ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl, global_key = GRef,
@@ -124,21 +121,21 @@ close(Ref, State) ->
File #file { reader_count = RCount1,
has_writer = HasWriter1 })
end,
- {ok, State};
+ ok;
{Error, Handle1} ->
put({Ref, fhc_handle}, Handle1),
- {Error, State}
+ Error
end
end.
-release(_Ref, State) -> %% noop just for now
- {ok, State}.
+release(_Ref) -> %% noop just for now
+ ok.
-read(Ref, Count, State) ->
- case get_or_reopen(Ref, State) of
- {{ok, #handle { is_read = false }}, State1} ->
- {{error, not_open_for_reading}, State1};
- {{ok, Handle}, State1} ->
+read(Ref, Count) ->
+ case get_or_reopen(Ref) of
+ {ok, #handle { is_read = false }} ->
+ {error, not_open_for_reading};
+ {ok, Handle} ->
{Result, Handle1} =
case write_buffer(Handle) of
{ok, Handle2 = #handle { hdl = Hdl, offset = Offset }} ->
@@ -153,15 +150,15 @@ read(Ref, Count, State) ->
{Error, Handle2} -> {Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State1};
+ Result;
ErrorAndState -> ErrorAndState
end.
-append(Ref, Data, State) ->
- case get_or_reopen(Ref, State) of
- {{ok, #handle { is_write = false }}, State1} ->
- {{error, not_open_for_writing}, State1};
- {{ok, Handle}, State1} ->
+append(Ref, Data) ->
+ case get_or_reopen(Ref) of
+ {ok, #handle { is_write = false }} ->
+ {error, not_open_for_writing};
+ {ok, Handle} ->
{Result, Handle1} =
case maybe_seek(eof, Handle) of
{ok, Handle2 = #handle { at_eof = true }} ->
@@ -170,54 +167,54 @@ append(Ref, Data, State) ->
{Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State1};
+ Result;
ErrorAndState -> ErrorAndState
end.
-last_sync_offset(Ref, State) ->
- case get_or_reopen(Ref, State) of
- {{ok, #handle { trusted_offset = TrustedOffset }}, State1} ->
- {{ok, TrustedOffset}, State1};
+last_sync_offset(Ref) ->
+ case get_or_reopen(Ref) of
+ {ok, #handle { trusted_offset = TrustedOffset }} ->
+ {ok, TrustedOffset};
ErrorAndState -> ErrorAndState
end.
-position(Ref, NewOffset, State) ->
- case get_or_reopen(Ref, State) of
- {{ok, Handle}, State1} ->
+position(Ref, NewOffset) ->
+ case get_or_reopen(Ref) of
+ {ok, Handle} ->
{Result, Handle1} =
case write_buffer(Handle) of
{ok, Handle2} -> maybe_seek(NewOffset, Handle2);
{Error, Handle2} -> {Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State1};
+ Result;
ErrorAndState -> ErrorAndState
end.
-sync(Ref, State) ->
- case get_or_reopen(Ref, State) of
- {{ok, Handle = #handle { write_buffer = [], hdl = Hdl,
- offset = Offset }}, State1} ->
+sync(Ref) ->
+ case get_or_reopen(Ref) of
+ {ok, Handle = #handle { write_buffer = [], hdl = Hdl,
+ offset = Offset }} ->
{Result, Handle1} =
case file:sync(Hdl) of
ok -> {ok, Handle #handle { trusted_offset = Offset }};
Error -> {Error, Handle}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State1};
- {{ok, Handle = #handle { at_eof = true }}, State1} ->
+ Result;
+ {ok, Handle = #handle { at_eof = true }} ->
%% we can't have content in the buffer without being at eof
{Result, Handle1} = write_buffer(Handle),
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now() }),
- {Result, State1};
+ Result;
ErrorAndState -> ErrorAndState
end.
-truncate(Ref, State) ->
- case get_or_reopen(Ref, State) of
- {{ok, #handle { is_write = false }}, State1} ->
- {{error, not_open_for_writing}, State1};
- {{ok, Handle}, State1} ->
+truncate(Ref) ->
+ case get_or_reopen(Ref) of
+ {ok, #handle { is_write = false }} ->
+ {error, not_open_for_writing};
+ {ok, Handle} ->
{Result, Handle1} =
case write_buffer(Handle) of
{ok,
@@ -236,21 +233,21 @@ truncate(Ref, State) ->
{Error, Handle2} -> {Error, Handle2}
end,
put({Ref, fhc_handle}, Handle1 #handle { last_used_at = now () }),
- {Result, State1};
+ Result;
ErrorAndState -> ErrorAndState
end.
-get_or_reopen(Ref, State) ->
+get_or_reopen(Ref) ->
case get({Ref, fhc_handle}) of
- undefined -> {{error, not_open}, State};
+ undefined -> {error, not_open};
#handle { hdl = closed, mode = Mode, global_key = GRef,
options = Options } ->
#file { path = Path } = get({GRef, fhc_file}),
- open1(Path, Mode, Options, Ref, GRef, State);
- Handle -> {{ok, Handle}, State}
+ open1(Path, Mode, Options, Ref, GRef);
+ Handle -> {ok, Handle}
end.
-open1(Path, Mode, Options, Ref, GRef, State) ->
+open1(Path, Mode, Options, Ref, GRef) ->
case file:open(Path, Mode) of
{ok, Hdl} ->
WriteBufferSize =
@@ -267,9 +264,9 @@ open1(Path, Mode, Options, Ref, GRef, State) ->
is_write = is_writer(Mode), is_read = is_reader(Mode),
global_key = GRef, last_used_at = now() },
put({Ref, fhc_handle}, Handle),
- {{ok, Handle}, State};
+ {ok, Handle};
{error, Reason} ->
- {{error, Reason}, State}
+ {error, Reason}
end.
maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, at_eof = AtEoF,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 50f013f89b..57abfa9dc0 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -109,7 +109,6 @@
-record(qistate,
{ dir,
seg_num_handles,
- hc_state,
journal_ack_count,
journal_ack_dict,
seg_ack_counts
@@ -125,7 +124,6 @@
-type(seq_id() :: integer()).
-type(qistate() :: #qistate { dir :: file_path(),
seg_num_handles :: dict(),
- hc_state :: any(),
journal_ack_count :: integer(),
journal_ack_dict :: dict(),
seg_ack_counts :: dict()
@@ -156,13 +154,11 @@
%%----------------------------------------------------------------------------
init(Name) ->
- HCState = file_handle_cache:init(),
StrName = queue_name_to_dir_name(Name),
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
State = #qistate { dir = Dir,
seg_num_handles = dict:new(),
- hc_state = HCState,
journal_ack_count = 0,
journal_ack_dict = dict:new(),
seg_ack_counts = dict:new() },
@@ -185,38 +181,31 @@ write_published(MsgId, SeqId, IsPersistent, State)
?MSG_ID_BYTES = size(MsgId),
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
{Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, HCState} =
- file_handle_cache:append(Hdl,
- <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, MsgId/binary>>,
- State1 #qistate.hc_state),
- State1 #qistate { hc_state = HCState }.
+ ok = file_handle_cache:append(Hdl,
+ <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, MsgId/binary>>),
+ State1.
write_delivered(SeqId, State) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
{Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, HCState} = file_handle_cache:append(
- Hdl,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- State1 #qistate.hc_state),
- State1 #qistate { hc_state = HCState }.
+ ok = file_handle_cache:append(
+ Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>),
+ State1.
write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict,
journal_ack_count = JAckCount }) ->
{Hdl, State1} = get_journal_handle(State),
- {JAckDict1, JAckCount1, HCState} =
+ {JAckDict1, JAckCount1} =
lists:foldl(
- fun (SeqId, {JAckDict2, JAckCount2, HCStateN}) ->
- {ok, HCStateM} = file_handle_cache:append(
- Hdl, <<SeqId:?SEQ_BITS>>, HCStateN),
- {add_ack_to_ack_dict(SeqId, JAckDict2),
- JAckCount2 + 1, HCStateM}
- end, {JAckDict, JAckCount, State1 #qistate.hc_state}, SeqIds),
+ fun (SeqId, {JAckDict2, JAckCount2}) ->
+ ok = file_handle_cache:append(Hdl, <<SeqId:?SEQ_BITS>>),
+ {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1}
+ end, {JAckDict, JAckCount}, SeqIds),
State2 = State1 #qistate { journal_ack_dict = JAckDict1,
- journal_ack_count = JAckCount1,
- hc_state = HCState },
+ journal_ack_count = JAckCount1 },
case JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT of
true -> full_flush_journal(State2);
false -> State2
@@ -228,14 +217,11 @@ full_flush_journal(State) ->
{false, State1} -> State1
end.
-sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) ->
- HCState1 =
- dict:fold(
- fun (_Key, Hdl, HCStateN) ->
- {ok, HCStateM} = file_handle_cache:sync(Hdl, HCStateN),
- HCStateM
- end, HCState, SegHdls),
- State #qistate { hc_state = HCState1 }.
+sync_all(State = #qistate { seg_num_handles = SegHdls }) ->
+ ok = dict:fold(fun (_Key, Hdl, ok) ->
+ file_handle_cache:sync(Hdl)
+ end, ok, SegHdls),
+ State.
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
{false, State};
@@ -249,11 +235,10 @@ flush_journal(State = #qistate { journal_ack_dict = JAckDict,
journal_ack_count = JAckCount1 },
if
JAckCount1 == 0 ->
- {Hdl, State3 = #qistate { hc_state = HCState }} =
- get_journal_handle(State2),
- {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState),
- {ok, HCState2} = file_handle_cache:truncate(Hdl, HCState1),
- {false, State3 #qistate { hc_state = HCState2 }};
+ {Hdl, State3} = get_journal_handle(State2),
+ ok = file_handle_cache:position(Hdl, bof),
+ ok = file_handle_cache:truncate(Hdl),
+ {false, State3};
JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
flush_journal(State2);
true ->
@@ -381,36 +366,27 @@ get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }
end.
new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) ->
- State1 = #qistate { hc_state = HCState,
- seg_num_handles = SegHdls1 } =
+ State1 = #qistate { seg_num_handles = SegHdls1 } =
case dict:size(SegHdls) > 100 of
true -> close_all_handles(State);
false -> State
end,
- {{ok, Hdl}, HCState1} =
- file_handle_cache:open(Path, Mode, [{write_buffer, infinity}], HCState),
- {Hdl, State1 #qistate { hc_state = HCState1,
- seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}.
+ {ok, Hdl} = file_handle_cache:open(Path, Mode, [{write_buffer, infinity}]),
+ {Hdl, State1 #qistate { seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}.
-close_handle(Key, State = #qistate { hc_state = HCState,
- seg_num_handles = SegHdls }) ->
+close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) ->
case dict:find(Key, SegHdls) of
{ok, Hdl} ->
- {ok, HCState1} = file_handle_cache:close(Hdl, HCState),
- State #qistate { hc_state = HCState1,
- seg_num_handles = dict:erase(Key, SegHdls) };
+ ok = file_handle_cache:close(Hdl),
+ State #qistate { seg_num_handles = dict:erase(Key, SegHdls) };
error -> State
end.
-close_all_handles(State = #qistate { hc_state = HCState,
- seg_num_handles = SegHdls }) ->
- HCState1 =
- dict:fold(
- fun (_Key, Hdl, HCStateN) ->
- {ok, HCStateM} = file_handle_cache:close(Hdl, HCStateN),
- HCStateM
- end, HCState, SegHdls),
- State #qistate { hc_state = HCState1, seg_num_handles = dict:new() }.
+close_all_handles(State = #qistate { seg_num_handles = SegHdls }) ->
+ ok = dict:fold(fun (_Key, Hdl, ok) ->
+ file_handle_cache:close(Hdl)
+ end, ok, SegHdls),
+ State #qistate { seg_num_handles = dict:new() }.
bool_to_int(true ) -> 1;
bool_to_int(false) -> 0.
@@ -507,24 +483,23 @@ find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) ->
scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
- {Hdl, State1 = #qistate { hc_state = HCState,
- journal_ack_dict = JAckDict }} =
+ {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} =
get_journal_handle(State),
%% ADict may well contain duplicates. However, this is ok, due to
%% the use of sets in replay_journal_acks_to_segment
- {ADict, HCState1} = load_journal(Hdl, JAckDict, HCState),
- State2 = close_handle(journal, State1 #qistate { hc_state = HCState1 }),
+ ADict = load_journal(Hdl, JAckDict),
+ State2 = close_handle(journal, State1),
{TotalMsgCount1, State3} =
dict:fold(fun replay_journal_acks_to_segment/3,
{TotalMsgCount, State2}, ADict),
ok = file:delete(JournalPath),
{TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}.
-load_journal(Hdl, ADict, HCState) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES, HCState) of
- {{ok, <<SeqId:?SEQ_BITS>>}, HCState1} ->
- load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1);
- {_ErrOrEoF, HCState1} -> {ADict, HCState1}
+load_journal(Hdl, ADict) ->
+ case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
+ {ok, <<SeqId:?SEQ_BITS>>} ->
+ load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict));
+ _ErrOrEoF -> ADict
end.
replay_journal_acks_to_segment(_, [], Acc) ->
@@ -554,17 +529,14 @@ deliver_transient(SegNum, SDict, State) ->
{[RelSeq | AckMeAcc], DeliverMeAcc}
end, {[], []}, SDict),
{Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, HCState} =
- case DeliverMe of
- [] -> {ok, State1 #qistate.hc_state};
- _ ->
- file_handle_cache:append(
- Hdl,
- [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ],
- State1 #qistate.hc_state)
- end,
- {AckMe, State1 #qistate { hc_state = HCState }}.
+ ok = case DeliverMe of
+ [] -> ok;
+ _ -> file_handle_cache:append(
+ Hdl,
+ [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ])
+ end,
+ {AckMe, State1}.
%%----------------------------------------------------------------------------
@@ -580,12 +552,11 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
case SegmentExists of
false -> {dict:new(), 0, 0, State};
true ->
- {Hdl, State1 = #qistate { hc_state = HCState,
- journal_ack_dict = JAckDict }} =
+ {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} =
get_seg_handle(SegNum, State),
- {ok, HCState1} = file_handle_cache:position(Hdl, bof, HCState),
- {SDict, AckCount, HighRelSeq, HCState2} =
- load_segment_entries(Hdl, dict:new(), 0, 0, HCState1),
+ ok = file_handle_cache:position(Hdl, bof),
+ {SDict, AckCount, HighRelSeq} =
+ load_segment_entries(Hdl, dict:new(), 0, 0),
RelSeqs = case dict:find(SegNum, JAckDict) of
{ok, RelSeqs1} -> RelSeqs1;
error -> []
@@ -594,35 +565,31 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
lists:foldl(fun (RelSeq, {SDict2, AckCount2}) ->
{dict:erase(RelSeq, SDict2), AckCount2 + 1}
end, {SDict, AckCount}, RelSeqs),
- {SDict1, AckCount1, HighRelSeq,
- State1 #qistate { hc_state = HCState2 }}
+ {SDict1, AckCount1, HighRelSeq, State1}
end.
-load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) ->
- case file_handle_cache:read(Hdl, 1, HCState) of
- {{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} ->
- {{ok, LSB}, HCState2} =
- file_handle_cache:read(
- Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1),
+load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) ->
+ case file_handle_cache:read(Hdl, 1) of
+ {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
+ {ok, LSB} = file_handle_cache:read(
+ Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
{SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
- load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq, HCState2);
- {{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>}, HCState1} ->
+ load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq);
+ {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} =
+ {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} =
file_handle_cache:read(
- Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1),
+ Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
- load_segment_entries(
- Hdl, dict:store(RelSeq, {MsgId, false,
- 1 == IsPersistentNum},
- SDict), AckCount, HighRelSeq1, HCState2);
- {_ErrOrEoF, HCState1} ->
- {SDict, AckCount, HighRelSeq, HCState1}
+ load_segment_entries(Hdl, dict:store(RelSeq, {MsgId, false,
+ 1 == IsPersistentNum},
+ SDict), AckCount, HighRelSeq1);
+ _ErrOrEoF -> {SDict, AckCount, HighRelSeq}
end.
deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
@@ -665,15 +632,12 @@ append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir })
append_acks_to_segment(SegNum, AckCount, Acks, State)
when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT ->
{Hdl, State1} = get_seg_handle(SegNum, State),
- {AckCount1, HCState} =
+ {ok, AckCount1} =
lists:foldl(
- fun (RelSeq, {AckCount2, HCStateN}) ->
- {ok, HCStateM} =
- file_handle_cache:append(
- Hdl,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>, HCStateN),
- {AckCount2 + 1, HCStateM}
- end, {AckCount, State1 #qistate.hc_state}, Acks),
- {ok, HCState1} = file_handle_cache:sync(Hdl, HCState),
- {AckCount1, State1 #qistate { hc_state = HCState1 }}.
+ fun (RelSeq, {ok, AckCount2}) ->
+ {file_handle_cache:append(
+ Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>), AckCount2 + 1}
+ end, {ok, AckCount}, Acks),
+ ok = file_handle_cache:sync(Hdl),
+ {AckCount1, State1}.