diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-20 17:28:06 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-20 17:28:06 +0100 |
| commit | 92addf3ac62337fdf3c25b058d5f1f8a1aff1e48 (patch) | |
| tree | e6e3a756f1ba6eca334c1f7672369b70ba65dd0a /src | |
| parent | 588d183c81a77b93543ade52583d32b8759ee78e (diff) | |
| download | rabbitmq-server-git-92addf3ac62337fdf3c25b058d5f1f8a1aff1e48.tar.gz | |
well, it's about 4 times slower, but the dumb fhc works on the queue index
Diffstat (limited to 'src')
| -rw-r--r-- | src/horrendously_dumb_file_handle_cache.erl | 263 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 476 |
2 files changed, 528 insertions, 211 deletions
diff --git a/src/horrendously_dumb_file_handle_cache.erl b/src/horrendously_dumb_file_handle_cache.erl new file mode 100644 index 0000000000..5b2bcb6cce --- /dev/null +++ b/src/horrendously_dumb_file_handle_cache.erl @@ -0,0 +1,263 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(horrendously_dumb_file_handle_cache). + +-export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2, + position/3, truncate/2, with_file_handle_at/4, sync_to_offset/3]). + +-record(hcstate, + { ref_entry, path_mode_ref }). + +-record(entry, + { hdl, + current_offset, + last_sync_offset, + is_dirty, + is_append, + path_mode_key }). + +init() -> + #hcstate { ref_entry = dict:new(), + path_mode_ref = dict:new() }. + +open(Path, Mode, [] = _ExtraOptions, + State = #hcstate { ref_entry = RefEntry, path_mode_ref = PathModeRef }) -> + Mode1 = lists:usort(Mode), + Path1 = filename:absname(Path), + Key = {Path1, Mode1}, + case dict:find(Key, PathModeRef) of + {ok, Ref} -> {{ok, Ref}, State}; + error -> + case file:open(Path1, Mode1) of + {ok, Hdl} -> + Ref = make_ref(), + PathModeRef1 = dict:store(Key, Ref, PathModeRef), + Entry = #entry { hdl = Hdl, current_offset = 0, + last_sync_offset = 0, is_dirty = false, + is_append = lists:member(append, Mode1), + path_mode_key = Key }, + RefEntry1 = dict:store(Ref, Entry, RefEntry), + {{ok, Ref}, State #hcstate { ref_entry = RefEntry1, + path_mode_ref = PathModeRef1 }}; + {error, Error} -> + {{error, Error}, State} + end + end. + +close(Ref, State = #hcstate { ref_entry = RefEntry, + path_mode_ref = PathModeRef }) -> + {ok, + case dict:find(Ref, RefEntry) of + {ok, #entry { hdl = Hdl, is_dirty = IsDirty, path_mode_key = Key }} -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + ok = file:close(Hdl), + State #hcstate { ref_entry = dict:erase(Ref, RefEntry), + path_mode_ref = dict:erase(Key, PathModeRef) }; + error -> State + end}. + +release(_Ref, State) -> %% noop for the time being + {ok, State}. + +read(Ref, Offset, Count, State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset, + last_sync_offset = LastSyncOffset, + is_dirty = IsDirty }} -> + NewOffset = Count + + case Offset of + cur -> OldOffset; + _ -> {ok, RealOff} = file:position(Hdl, Offset), + RealOff + end, + {IsDirty1, LastSyncOffset1} = + case IsDirty andalso NewOffset > LastSyncOffset of + true -> ok = file:sync(Hdl), + {false, lists:max([NewOffset, OldOffset])}; + false -> {IsDirty, LastSyncOffset} + end, + Entry1 = Entry #entry { current_offset = NewOffset, + last_sync_offset = LastSyncOffset1, + is_dirty = IsDirty1 }, + State1 = State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }, + {file:read(Hdl, Count), State1}; + error -> {{error, not_open}, State} + end. + +%% if the file was opened in append mode, then Offset is ignored, as +%% it would only affect the read head for this file. +write(Ref, Offset, Data, State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset, + is_append = IsAppend }} -> + NewOffset = + case IsAppend of + true -> + OldOffset; + false -> + size_of_write_data(Data) + + case Offset of + cur -> OldOffset; + _ -> {ok, RealOff} = file:position(Hdl, Offset), + RealOff + end + end, + Entry1 = Entry #entry { current_offset = NewOffset, + is_dirty = true }, + State1 = State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }, + {file:write(Hdl, Data), State1}; + error -> {{error, not_open}, State} + end. + +sync(Ref, State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, Entry = #entry { hdl = Hdl, current_offset = Offset, + last_sync_offset = LastSyncOffset, + is_dirty = true }} -> + SyncOffset = lists:max([Offset, LastSyncOffset]), + ok = file:sync(Hdl), + Entry1 = Entry #entry { last_sync_offset = SyncOffset, + is_dirty = false }, + {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }}; + {ok, _Entry_not_dirty} -> + {ok, State}; + error -> {{error, not_open}, State} + end. + +position(Ref, NewOffset, State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, #entry { current_offset = NewOffset }} -> + {ok, State}; + {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset, + last_sync_offset = LastSyncOffset, + is_dirty = IsDirty }} -> + {ok, RealOff} = file:position(Hdl, NewOffset), + {IsDirty1, LastSyncOffset1} = + case {IsDirty, RealOff > LastSyncOffset} of + {true, true} -> + ok = file:sync(Hdl), + {false, lists:max([RealOff, OldOffset])}; + {false, true} -> + {false, RealOff}; + _ -> + {IsDirty, LastSyncOffset} + end, + Entry1 = Entry #entry { current_offset = RealOff, + last_sync_offset = LastSyncOffset1, + is_dirty = IsDirty1 }, + {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }}; + error -> + {{error, not_open}, State} + end. + +truncate(Ref, State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, Entry = #entry { hdl = Hdl, current_offset = Offset, + last_sync_offset = LastSyncOffset, + is_dirty = IsDirty }} -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + LastSyncOffset1 = lists:min([Offset, LastSyncOffset]), + ok = file:truncate(Hdl), + Entry1 = Entry #entry { last_sync_offset = LastSyncOffset1, + is_dirty = false }, + {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }}; + error -> {{error, not_open}, State} + end. + +with_file_handle_at(Ref, Offset, Fun, + State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset, + last_sync_offset = LastSyncOffset, + is_dirty = IsDirty }} -> + Offset1 = + case Offset of + cur -> OldOffset; + OldOffset -> OldOffset; + _ -> {ok, RealOff} = file:position(Hdl, Offset), + RealOff + end, + LastSyncOffset1 = + case IsDirty of + true -> ok = file:sync(Hdl), + lists:max([Offset1, OldOffset]); + false -> LastSyncOffset + end, + {Offset2, Result} = Fun(Hdl), + Entry1 = Entry #entry { current_offset = Offset2, + last_sync_offset = LastSyncOffset1, + is_dirty = true }, + State1 = State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }, + {Result, State1}; + error -> {{error, not_open}, State} + end. + +sync_to_offset(Ref, Offset, State = #hcstate { ref_entry = RefEntry }) -> + case dict:find(Ref, RefEntry) of + {ok, Entry = #entry { hdl = Hdl, last_sync_offset = LastSyncOffset, + current_offset = CurOffset, is_dirty = true }} + when (Offset =:= cur andalso CurOffset > LastSyncOffset) + orelse (Offset > LastSyncOffset) -> + ok = file:sync(Hdl), + LastSyncOffset1 = + case Offset of + cur -> lists:max([LastSyncOffset, CurOffset]); + _ -> lists:max([LastSyncOffset, CurOffset, Offset]) + end, + Entry1 = Entry #entry { last_sync_offset = LastSyncOffset1, + is_dirty = false }, + {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1, + RefEntry) }}; + error -> {{error, not_open}, State} + end. + +size_of_write_data(Data) -> + size_of_write_data(Data, 0). + +size_of_write_data([], Acc) -> + Acc; +size_of_write_data([A|B], Acc) -> + size_of_write_data(B, size_of_write_data(A, Acc)); +size_of_write_data(Bin, Acc) when is_binary(Bin) -> + size(Bin) + Acc. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3471913f65..3a21c23625 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -108,11 +108,10 @@ -record(qistate, { dir, - cur_seg_num, - cur_seg_hdl, + seg_num_handles, + hc_state, journal_ack_count, journal_ack_dict, - journal_handle, seg_ack_counts }). @@ -124,14 +123,11 @@ -type(msg_id() :: binary()). -type(seq_id() :: integer()). --type(int_or_undef() :: integer() | 'undefined'). --type(io_dev_or_undef() :: io_device() | 'undefined'). -type(qistate() :: #qistate { dir :: file_path(), - cur_seg_num :: int_or_undef(), - cur_seg_hdl :: io_dev_or_undef(), + seg_num_handles :: dict(), + hc_state :: any(), journal_ack_count :: integer(), journal_ack_dict :: dict(), - journal_handle :: io_device(), seg_ack_counts :: dict() }). @@ -154,38 +150,30 @@ -endif. + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- init(Name) -> + HCState = horrendously_dumb_file_handle_cache:init(), StrName = queue_name_to_dir_name(Name), Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - {TotalMsgCount, AckCounts, TransientADict} = - find_ack_counts_and_deliver_transient_msgs(Dir), - {TotalMsgCount1, AckCounts1} = - scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict), - {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME), - [raw, binary, delayed_write, write, read]), - {TotalMsgCount1, #qistate { dir = Dir, - cur_seg_num = undefined, - cur_seg_hdl = undefined, - journal_ack_count = 0, - journal_ack_dict = dict:new(), - journal_handle = JournalHdl, - seg_ack_counts = AckCounts1 - }}. - -terminate(State = #qistate { journal_handle = undefined }) -> - State; -terminate(State) -> - State1 = #qistate { cur_seg_num = SegNum } = full_flush_journal(State), - State2 = #qistate { journal_handle = JournalHdl } = - close_file_handle_for_seg(SegNum, State1), - ok = file:sync(JournalHdl), - ok = file:close(JournalHdl), - State2 #qistate { journal_handle = undefined }. + State = #qistate { dir = Dir, + seg_num_handles = dict:new(), + hc_state = HCState, + journal_ack_count = 0, + journal_ack_dict = dict:new(), + seg_ack_counts = dict:new() }, + {TotalMsgCount, State1} = find_ack_counts_and_deliver_transient_msgs(State), + scatter_journal(TotalMsgCount, State1). + +terminate(State = #qistate { seg_num_handles = SegHdls }) -> + case 0 == dict:size(SegHdls) of + true -> State; + false -> close_all_handles(full_flush_journal(State)) + end. terminate_and_erase(State) -> State1 = terminate(State), @@ -196,35 +184,43 @@ write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_file_handle_for_seg(SegNum, State), - ok = file:write(Hdl, - <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS, MsgId/binary>>), - State1. + {Hdl, State1} = get_seg_handle(SegNum, State), + {ok, HCState} = horrendously_dumb_file_handle_cache:write( + Hdl, eof, + <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, MsgId/binary>>, + State1 #qistate.hc_state), + State1 #qistate { hc_state = HCState }. write_delivered(SeqId, State) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), - {Hdl, State1} = get_file_handle_for_seg(SegNum, State), - ok = file:write(Hdl, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>), - State1. - -write_acks(SeqIds, State = #qistate { journal_handle = JournalHdl, - journal_ack_dict = JAckDict, + {Hdl, State1} = get_seg_handle(SegNum, State), + {ok, HCState} = horrendously_dumb_file_handle_cache:write( + Hdl, eof, + <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + State1 #qistate.hc_state), + State1 #qistate { hc_state = HCState }. + +write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict, journal_ack_count = JAckCount }) -> - {JAckDict1, JAckCount1} = + {Hdl, State1} = get_journal_handle(State), + {JAckDict1, JAckCount1, HCState} = lists:foldl( - fun (SeqId, {JAckDict2, JAckCount2}) -> - ok = file:write(JournalHdl, <<SeqId:?SEQ_BITS>>), - {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1} - end, {JAckDict, JAckCount}, SeqIds), - State1 = State #qistate { journal_ack_dict = JAckDict1, - journal_ack_count = JAckCount1 }, + fun (SeqId, {JAckDict2, JAckCount2, HCStateN}) -> + {ok, HCStateM} = + horrendously_dumb_file_handle_cache:write( + Hdl, eof, <<SeqId:?SEQ_BITS>>, HCStateN), + {add_ack_to_ack_dict(SeqId, JAckDict2), + JAckCount2 + 1, HCStateM} + end, {JAckDict, JAckCount, State1 #qistate.hc_state}, SeqIds), + State2 = State1 #qistate { journal_ack_dict = JAckDict1, + journal_ack_count = JAckCount1, + hc_state = HCState }, case JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT of - true -> full_flush_journal(State1); - false -> State1 + true -> full_flush_journal(State2); + false -> State2 end. full_flush_journal(State) -> @@ -235,36 +231,32 @@ full_flush_journal(State) -> flush_journal(State = #qistate { journal_ack_count = 0 }) -> {false, State}; -flush_journal(State = #qistate { journal_handle = JournalHdl, - journal_ack_dict = JAckDict, - journal_ack_count = JAckCount, - seg_ack_counts = AckCounts, - dir = Dir }) -> +flush_journal(State = #qistate { journal_ack_dict = JAckDict, + journal_ack_count = JAckCount }) -> [SegNum|_] = dict:fetch_keys(JAckDict), Acks = dict:fetch(SegNum, JAckDict), - SegPath = seg_num_to_path(Dir, SegNum), - State1 = close_file_handle_for_seg(SegNum, State), - AckCounts1 = append_acks_to_segment(SegPath, SegNum, AckCounts, Acks), + State1 = append_acks_to_segment(SegNum, Acks, State), JAckCount1 = JAckCount - length(Acks), State2 = State1 #qistate { journal_ack_dict = dict:erase(SegNum, JAckDict), - journal_ack_count = JAckCount1, - seg_ack_counts = AckCounts1 }, + journal_ack_count = JAckCount1 }, if JAckCount1 == 0 -> - {ok, 0} = file:position(JournalHdl, 0), - ok = file:truncate(JournalHdl), - {false, State2}; + {Hdl, State3 = #qistate { hc_state = HCState }} = + get_journal_handle(State2), + {ok, HCState1} = + horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState), + {ok, HCState2} = + horrendously_dumb_file_handle_cache:truncate(Hdl, HCState1), + {false, State3 #qistate { hc_state = HCState2 }}; JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT -> flush_journal(State2); true -> {true, State2} end. -read_segment_entries(InitSeqId, State = - #qistate { dir = Dir, journal_ack_dict = JAckDict }) -> +read_segment_entries(InitSeqId, State) -> {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - SegPath = seg_num_to_path(Dir, SegNum), - {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, JAckDict), + {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State), %% deliberately sort the list desc, because foldl will reverse it RelSeqs = rev_sort(dict:fetch_keys(SDict)), {lists:foldl(fun (RelSeq, Acc) -> @@ -273,7 +265,7 @@ read_segment_entries(InitSeqId, State = [ {MsgId, reconstruct_seq_id(SegNum, RelSeq), IsPersistent, IsDelivered} | Acc] end, [], RelSeqs), - State}. + State1}. next_segment_boundary(SeqId) -> {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -282,27 +274,31 @@ next_segment_boundary(SeqId) -> segment_size() -> ?SEGMENT_ENTRIES_COUNT. -find_lowest_seq_id_seg_and_next_seq_id( - State = #qistate { dir = Dir, journal_ack_dict = JAckDict }) -> - SegNumsPaths = all_segment_nums_paths(Dir), +find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> + SegNums = all_segment_nums(Dir), %% We don't want the lowest seq_id, merely the seq_id of the start %% of the lowest segment. That seq_id may not actually exist, but %% that's fine. The important thing is that the segment exists and %% the seq_id reported is on a segment boundary. LowSeqIdSeg = - case SegNumsPaths of + case SegNums of [] -> 0; - _ -> {SegNum1, _SegPath1} = lists:min(SegNumsPaths), - reconstruct_seq_id(SegNum1, 0) + _ -> reconstruct_seq_id(lists:min(SegNums), 0) end, {NextSeqId, State1} = - case SegNumsPaths of + case SegNums of [] -> {0, State}; - _ -> {SegNum2, SegPath2} = lists:max(SegNumsPaths), - State2 = close_file_handle_for_seg(SegNum2, State), - {_SDict, _AckCount, HighRelSeq} = - load_segment(SegNum2, SegPath2, JAckDict), - {1 + reconstruct_seq_id(SegNum2, HighRelSeq), State2} + _ -> SegNum2 = lists:max(SegNums), + {SDict, AckCount, HighRelSeq, State2} = + load_segment(SegNum2, State), + NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq), + NextSeqId2 = + case 0 == AckCount andalso 0 == HighRelSeq andalso + 0 == dict:size(SDict) of + true -> NextSeqId1; + false -> NextSeqId1 + 1 + end, + {NextSeqId2, State2} end, {LowSeqIdSeg, NextSeqId, State1}. @@ -341,6 +337,7 @@ start_msg_store(DurableQueues) -> end, TransientDirs), ok. + %%---------------------------------------------------------------------------- %% Minor Helpers %%---------------------------------------------------------------------------- @@ -357,28 +354,56 @@ queues_dir() -> rev_sort(List) -> lists:sort(fun (A, B) -> B < A end, List). -close_file_handle_for_seg(_SegNum, - State = #qistate { cur_seg_num = undefined }) -> - State; -close_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum, - cur_seg_hdl = Hdl }) -> - ok = file:sync(Hdl), - ok = file:close(Hdl), - State #qistate { cur_seg_num = undefined, cur_seg_hdl = undefined }; -close_file_handle_for_seg(_SegNum, State) -> - State. - -get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum, - cur_seg_hdl = Hdl }) -> - {Hdl, State}; -get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) -> - State1 = #qistate { dir = Dir } = - close_file_handle_for_seg(CurSegNum, State), - {ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum), - [binary, raw, read, write, - {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]), - {ok, _} = file:position(Hdl, {eof, 0}), - {Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}. +get_journal_handle(State = #qistate { dir = Dir }) -> + Path = filename:join(Dir, ?ACK_JOURNAL_FILENAME), + Mode = [raw, binary, delayed_write, write, read], + get_handle(journal, Path, Mode, State). + +get_seg_handle(SegNum, State = #qistate { dir = Dir }) -> + get_handle(SegNum, seg_num_to_path(Dir, SegNum), + [binary, raw, read, write, + {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}], + State). + +get_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) -> + State1 = #qistate { hc_state = HCState, + seg_num_handles = SegHdls1 } = + case dict:size(SegHdls) > 10 of + true -> close_all_handles(State); + false -> State + end, + case dict:find(Key, SegHdls1) of + {ok, Hdl} -> {Hdl, State1}; + error -> + {{ok, Hdl}, HCState1} = + horrendously_dumb_file_handle_cache:open(Path, Mode, [], + HCState), + {Hdl, State1 #qistate { + hc_state = HCState1, + seg_num_handles = dict:store(Key, Hdl, SegHdls1) }} + end. + +close_handle(Key, State = #qistate { hc_state = HCState, + seg_num_handles = SegHdls }) -> + case dict:find(Key, SegHdls) of + {ok, Hdl} -> + {ok, HCState1} = + horrendously_dumb_file_handle_cache:close(Hdl, HCState), + State #qistate { hc_state = HCState1, + seg_num_handles = dict:erase(Key, SegHdls) }; + error -> State + end. + +close_all_handles(State = #qistate { hc_state = HCState, + seg_num_handles = SegHdls }) -> + HCState1 = + dict:fold( + fun (_Key, Ref, HCStateN) -> + {ok, HCStateM} = + horrendously_dumb_file_handle_cache:close(Ref, HCStateN), + HCStateM + end, HCState, SegHdls), + State #qistate { hc_state = HCState1, seg_num_handles = dict:new() }. bool_to_int(true ) -> 1; bool_to_int(false) -> 0. @@ -403,6 +428,7 @@ add_ack_to_ack_dict(SeqId, ADict) -> {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict). + %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- @@ -430,79 +456,88 @@ queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries], {MsgId, bool_to_int(IsPersistent), {Entries, N - 1, LowSeqIdSeg, State, QueueNames}}. + %%---------------------------------------------------------------------------- %% Startup Functions %%---------------------------------------------------------------------------- -all_segment_nums_paths(Dir) -> - [{list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, - SegName)), filename:join(Dir, SegName)} +all_segment_nums(Dir) -> + [list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]. -find_ack_counts_and_deliver_transient_msgs(Dir) -> - SegNumsPaths = all_segment_nums_paths(Dir), - lists:foldl( - fun ({SegNum, SegPath}, {TotalMsgCount, AckCounts, TransientADict}) -> - {SDict, AckCount, _HighRelSeq} = - load_segment(SegNum, SegPath, dict:new()), - TransientMsgsAcks = deliver_transient(SegPath, SDict), - %% ignore TransientMsgsAcks in AckCounts1 and - %% TotalMsgCount1 because the TransientMsgsAcks fall - %% through into scatter_journal at which point the - %% AckCounts and TotalMsgCount will be correctly - %% adjusted. - TotalMsgCount1 = TotalMsgCount + dict:size(SDict), - AckCounts1 = case AckCount of - 0 -> AckCounts; - N -> dict:store(SegNum, N, AckCounts) - end, - TransientADict1 = - case TransientMsgsAcks of - [] -> TransientADict; - _ -> dict:store(SegNum, TransientMsgsAcks, TransientADict) - end, - {TotalMsgCount1, AckCounts1, TransientADict1} - end, {0, dict:new(), dict:new()}, SegNumsPaths). +find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) -> + SegNums = all_segment_nums(Dir), + {TotalMsgCount, State1} = + lists:foldl( + fun (SegNum, {TotalMsgCount1, StateN}) -> + {SDict, AckCount, _HighRelSeq, StateM} = + load_segment(SegNum, StateN), + {TransientMsgsAcks, StateL = + #qistate { seg_ack_counts = AckCounts, + journal_ack_dict = JAckDict }} = + deliver_transient(SegNum, SDict, StateM), + %% ignore TransientMsgsAcks in AckCounts and + %% JAckDict1 because the TransientMsgsAcks fall + %% through into scatter_journal at which point the + %% AckCounts and TotalMsgCount will be correctly + %% adjusted. + TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict), + AckCounts1 = case AckCount of + 0 -> AckCounts; + N -> dict:store(SegNum, N, AckCounts) + end, + JAckDict1 = + case TransientMsgsAcks of + [] -> JAckDict; + _ -> dict:store(SegNum, TransientMsgsAcks, JAckDict) + end, + {TotalMsgCount2, + StateL #qistate { seg_ack_counts = AckCounts1, + journal_ack_dict = JAckDict1 }} + end, {0, State}, SegNums), + {TotalMsgCount, State1}. -scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict) -> +scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) -> JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME), - case file:open(JournalPath, [read, read_ahead, raw, binary]) of - {error, enoent} -> {TotalMsgCount, AckCounts}; - {ok, Hdl} -> - %% ADict may well contain duplicates. However, this is ok, - %% due to the use of sets in replay_journal_acks_to_segment - ADict = load_journal(Hdl, TransientADict), - ok = file:close(Hdl), - {TotalMsgCount1, AckCounts1, _Dir} = - dict:fold(fun replay_journal_acks_to_segment/3, - {TotalMsgCount, AckCounts, Dir}, ADict), - ok = file:delete(JournalPath), - {TotalMsgCount1, AckCounts1} - end. - -load_journal(Hdl, ADict) -> - case file:read(Hdl, ?SEQ_BYTES) of - {ok, <<SeqId:?SEQ_BITS>>} -> - load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict)); - _ErrOrEoF -> ADict + {Hdl, State1 = #qistate { hc_state = HCState, + journal_ack_dict = JAckDict }} = + get_journal_handle(State), + %% ADict may well contain duplicates. However, this is ok, due to + %% the use of sets in replay_journal_acks_to_segment + {ADict, HCState1} = load_journal(Hdl, JAckDict, HCState), + State2 = close_handle(journal, State1 #qistate { hc_state = HCState1 }), + {TotalMsgCount1, State3} = + dict:fold(fun replay_journal_acks_to_segment/3, + {TotalMsgCount, State2}, ADict), + ok = file:delete(JournalPath), + {TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}. + +load_journal(Hdl, ADict, HCState) -> + case horrendously_dumb_file_handle_cache:read( + Hdl, cur, ?SEQ_BYTES, HCState) of + {{ok, <<SeqId:?SEQ_BITS>>}, HCState1} -> + load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1); + {_ErrOrEoF, HCState1} -> {ADict, HCState1} end. replay_journal_acks_to_segment(_, [], Acc) -> Acc; -replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, AckCounts, Dir}) -> - SegPath = seg_num_to_path(Dir, SegNum), +replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) -> %% supply empty dict so that we get all msgs in SDict that have %% not been acked in the segment file itself - {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()), + {SDict, _AckCount, _HighRelSeq, State1} = + load_segment(SegNum, State #qistate { journal_ack_dict = dict:new() }), ValidRelSeqIds = dict:fetch_keys(SDict), ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds), sets:from_list(Acks))), %% ValidAcks will not contain any duplicates at this point. + State2 = + State1 #qistate { journal_ack_dict = State #qistate.journal_ack_dict }, {TotalMsgCount - length(ValidAcks), - append_acks_to_segment(SegPath, SegNum, AckCounts, ValidAcks), Dir}. + append_acks_to_segment(SegNum, ValidAcks, State2)}. -deliver_transient(SegPath, SDict) -> +deliver_transient(SegNum, SDict, State) -> {AckMe, DeliverMe} = dict:fold( fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) -> @@ -512,27 +547,36 @@ deliver_transient(SegPath, SDict) -> (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) -> {[RelSeq | AckMeAcc], DeliverMeAcc} end, {[], []}, SDict), - {ok, Hdl} = file:open(SegPath, [binary, raw, read, write, - {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]), - {ok, _} = file:position(Hdl, {eof, 0}), - ok = file:write(Hdl, [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]), - ok = file:sync(Hdl), - ok = file:close(Hdl), - AckMe. + {Hdl, State1} = get_seg_handle(SegNum, State), + {ok, HCState} = horrendously_dumb_file_handle_cache:write( + Hdl, eof, + [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ], + State1 #qistate.hc_state), + {AckMe, State1 #qistate { hc_state = HCState }}. + %%---------------------------------------------------------------------------- %% Loading Segments %%---------------------------------------------------------------------------- -load_segment(SegNum, SegPath, JAckDict) -> - case file:open(SegPath, [raw, binary, read, - {read_ahead, ?SEGMENT_TOTAL_SIZE}]) of - {error, enoent} -> {dict:new(), 0, 0}; - {ok, Hdl} -> - {SDict, AckCount, HighRelSeq} = - load_segment_entries(Hdl, dict:new(), 0, 0), - ok = file:close(Hdl), +load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls, + dir = Dir }) -> + SegmentExists = case dict:find(SegNum, SegHdls) of + {ok, _} -> true; + error -> filelib:is_file(seg_num_to_path(Dir, SegNum)) + end, + case SegmentExists of + false -> {dict:new(), 0, 0, State}; + true -> + {Hdl, State1 = #qistate { hc_state = HCState, + journal_ack_dict = JAckDict }} = + get_seg_handle(SegNum, State), + {ok, HCState1} = + horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState), + + {SDict, AckCount, HighRelSeq, HCState2} = + load_segment_entries(Hdl, dict:new(), 0, 0, HCState1), RelSeqs = case dict:find(SegNum, JAckDict) of {ok, RelSeqs1} -> RelSeqs1; error -> [] @@ -541,30 +585,35 @@ load_segment(SegNum, SegPath, JAckDict) -> lists:foldl(fun (RelSeq, {SDict2, AckCount2}) -> {dict:erase(RelSeq, SDict2), AckCount2 + 1} end, {SDict, AckCount}, RelSeqs), - {SDict1, AckCount1, HighRelSeq} + {SDict1, AckCount1, HighRelSeq, + State1 #qistate { hc_state = HCState2 }} end. -load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) -> - case file:read(Hdl, 1) of - {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> - {ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), +load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) -> + case horrendously_dumb_file_handle_cache:read(Hdl, cur, 1, HCState) of + {{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} -> + {{ok, LSB}, HCState2} = + horrendously_dumb_file_handle_cache:read( + Hdl, cur, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq), - load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq); - {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> + load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq, HCState2); + {{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>}, HCState1} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = - file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), + {{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} = + horrendously_dumb_file_handle_cache:read( + Hdl, cur, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1), <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, HighRelSeq1 = lists:max([RelSeq, HighRelSeq]), load_segment_entries( Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum}, - SDict), AckCount, HighRelSeq1); - _ErrOrEoF -> {SDict, AckCount, HighRelSeq} + SDict), AckCount, HighRelSeq1, HCState2); + {_ErrOrEoF, HCState1} -> + {SDict, AckCount, HighRelSeq, HCState1} end. deliver_or_ack_msg(SDict, AckCount, RelSeq) -> @@ -580,37 +629,42 @@ deliver_or_ack_msg(SDict, AckCount, RelSeq) -> %% Appending Acks to Segments %%---------------------------------------------------------------------------- -append_acks_to_segment(SegPath, SegNum, AckCounts, Acks) -> +append_acks_to_segment(SegNum, Acks, + State = #qistate { seg_ack_counts = AckCounts }) -> AckCount = case dict:find(SegNum, AckCounts) of {ok, AckCount1} -> AckCount1; error -> 0 end, - case append_acks_to_segment(SegPath, AckCount, Acks) of - 0 -> AckCounts; - ?SEGMENT_ENTRIES_COUNT -> dict:erase(SegNum, AckCounts); - AckCount2 -> dict:store(SegNum, AckCount2, AckCounts) + case append_acks_to_segment(SegNum, AckCount, Acks, State) of + {0, State1} -> State1; + {?SEGMENT_ENTRIES_COUNT, + State1 = #qistate { seg_ack_counts = AckCounts1 }} -> + State1 #qistate { seg_ack_counts = dict:erase(SegNum, AckCounts1) }; + {AckCount2, State1 = #qistate { seg_ack_counts = AckCounts1 }} -> + State1 #qistate { seg_ack_counts = dict:store(SegNum, AckCount2, + AckCounts1) } end. -append_acks_to_segment(SegPath, AckCount, Acks) +append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir }) when length(Acks) + AckCount == ?SEGMENT_ENTRIES_COUNT -> - ok = case file:delete(SegPath) of + State1 = close_handle(SegNum, State), + ok = case file:delete(seg_num_to_path(Dir, SegNum)) of ok -> ok; {error, enoent} -> ok end, - ?SEGMENT_ENTRIES_COUNT; -append_acks_to_segment(SegPath, AckCount, Acks) + {?SEGMENT_ENTRIES_COUNT, State1}; +append_acks_to_segment(SegNum, AckCount, Acks, State) when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT -> - {ok, Hdl} = file:open(SegPath, [raw, binary, read, write, - {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]), - {ok, _} = file:position(Hdl, {eof, 0}), - AckCount1 = + {Hdl, State1} = get_seg_handle(SegNum, State), + {AckCount1, HCState} = lists:foldl( - fun (RelSeq, AckCount2) -> - ok = file:write(Hdl, - <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>), - AckCount2 + 1 - end, AckCount, Acks), - ok = file:sync(Hdl), - ok = file:close(Hdl), - AckCount1. + fun (RelSeq, {AckCount2, HCStateN}) -> + {ok, HCStateM} = + horrendously_dumb_file_handle_cache:write( + Hdl, eof, + <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, HCStateN), + {AckCount2 + 1, HCStateM} + end, {AckCount, State1 #qistate.hc_state}, Acks), + {ok, HCState1} = horrendously_dumb_file_handle_cache:sync(Hdl, HCState), + {AckCount1, State1 #qistate { hc_state = HCState1 }}. |
