summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-20 17:28:06 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-20 17:28:06 +0100
commit92addf3ac62337fdf3c25b058d5f1f8a1aff1e48 (patch)
treee6e3a756f1ba6eca334c1f7672369b70ba65dd0a /src
parent588d183c81a77b93543ade52583d32b8759ee78e (diff)
downloadrabbitmq-server-git-92addf3ac62337fdf3c25b058d5f1f8a1aff1e48.tar.gz
well, it's about 4 times slower, but the dumb fhc works on the queue index
Diffstat (limited to 'src')
-rw-r--r--src/horrendously_dumb_file_handle_cache.erl263
-rw-r--r--src/rabbit_queue_index.erl476
2 files changed, 528 insertions, 211 deletions
diff --git a/src/horrendously_dumb_file_handle_cache.erl b/src/horrendously_dumb_file_handle_cache.erl
new file mode 100644
index 0000000000..5b2bcb6cce
--- /dev/null
+++ b/src/horrendously_dumb_file_handle_cache.erl
@@ -0,0 +1,263 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(horrendously_dumb_file_handle_cache).
+
+-export([init/0, open/4, close/2, release/2, read/4, write/4, sync/2,
+ position/3, truncate/2, with_file_handle_at/4, sync_to_offset/3]).
+
+-record(hcstate,
+ { ref_entry, path_mode_ref }).
+
+-record(entry,
+ { hdl,
+ current_offset,
+ last_sync_offset,
+ is_dirty,
+ is_append,
+ path_mode_key }).
+
+init() ->
+ #hcstate { ref_entry = dict:new(),
+ path_mode_ref = dict:new() }.
+
+open(Path, Mode, [] = _ExtraOptions,
+ State = #hcstate { ref_entry = RefEntry, path_mode_ref = PathModeRef }) ->
+ Mode1 = lists:usort(Mode),
+ Path1 = filename:absname(Path),
+ Key = {Path1, Mode1},
+ case dict:find(Key, PathModeRef) of
+ {ok, Ref} -> {{ok, Ref}, State};
+ error ->
+ case file:open(Path1, Mode1) of
+ {ok, Hdl} ->
+ Ref = make_ref(),
+ PathModeRef1 = dict:store(Key, Ref, PathModeRef),
+ Entry = #entry { hdl = Hdl, current_offset = 0,
+ last_sync_offset = 0, is_dirty = false,
+ is_append = lists:member(append, Mode1),
+ path_mode_key = Key },
+ RefEntry1 = dict:store(Ref, Entry, RefEntry),
+ {{ok, Ref}, State #hcstate { ref_entry = RefEntry1,
+ path_mode_ref = PathModeRef1 }};
+ {error, Error} ->
+ {{error, Error}, State}
+ end
+ end.
+
+close(Ref, State = #hcstate { ref_entry = RefEntry,
+ path_mode_ref = PathModeRef }) ->
+ {ok,
+ case dict:find(Ref, RefEntry) of
+ {ok, #entry { hdl = Hdl, is_dirty = IsDirty, path_mode_key = Key }} ->
+ ok = case IsDirty of
+ true -> file:sync(Hdl);
+ false -> ok
+ end,
+ ok = file:close(Hdl),
+ State #hcstate { ref_entry = dict:erase(Ref, RefEntry),
+ path_mode_ref = dict:erase(Key, PathModeRef) };
+ error -> State
+ end}.
+
+release(_Ref, State) -> %% noop for the time being
+ {ok, State}.
+
+read(Ref, Offset, Count, State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset,
+ last_sync_offset = LastSyncOffset,
+ is_dirty = IsDirty }} ->
+ NewOffset = Count +
+ case Offset of
+ cur -> OldOffset;
+ _ -> {ok, RealOff} = file:position(Hdl, Offset),
+ RealOff
+ end,
+ {IsDirty1, LastSyncOffset1} =
+ case IsDirty andalso NewOffset > LastSyncOffset of
+ true -> ok = file:sync(Hdl),
+ {false, lists:max([NewOffset, OldOffset])};
+ false -> {IsDirty, LastSyncOffset}
+ end,
+ Entry1 = Entry #entry { current_offset = NewOffset,
+ last_sync_offset = LastSyncOffset1,
+ is_dirty = IsDirty1 },
+ State1 = State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) },
+ {file:read(Hdl, Count), State1};
+ error -> {{error, not_open}, State}
+ end.
+
+%% if the file was opened in append mode, then Offset is ignored, as
+%% it would only affect the read head for this file.
+write(Ref, Offset, Data, State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset,
+ is_append = IsAppend }} ->
+ NewOffset =
+ case IsAppend of
+ true ->
+ OldOffset;
+ false ->
+ size_of_write_data(Data) +
+ case Offset of
+ cur -> OldOffset;
+ _ -> {ok, RealOff} = file:position(Hdl, Offset),
+ RealOff
+ end
+ end,
+ Entry1 = Entry #entry { current_offset = NewOffset,
+ is_dirty = true },
+ State1 = State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) },
+ {file:write(Hdl, Data), State1};
+ error -> {{error, not_open}, State}
+ end.
+
+sync(Ref, State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, Entry = #entry { hdl = Hdl, current_offset = Offset,
+ last_sync_offset = LastSyncOffset,
+ is_dirty = true }} ->
+ SyncOffset = lists:max([Offset, LastSyncOffset]),
+ ok = file:sync(Hdl),
+ Entry1 = Entry #entry { last_sync_offset = SyncOffset,
+ is_dirty = false },
+ {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) }};
+ {ok, _Entry_not_dirty} ->
+ {ok, State};
+ error -> {{error, not_open}, State}
+ end.
+
+position(Ref, NewOffset, State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, #entry { current_offset = NewOffset }} ->
+ {ok, State};
+ {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset,
+ last_sync_offset = LastSyncOffset,
+ is_dirty = IsDirty }} ->
+ {ok, RealOff} = file:position(Hdl, NewOffset),
+ {IsDirty1, LastSyncOffset1} =
+ case {IsDirty, RealOff > LastSyncOffset} of
+ {true, true} ->
+ ok = file:sync(Hdl),
+ {false, lists:max([RealOff, OldOffset])};
+ {false, true} ->
+ {false, RealOff};
+ _ ->
+ {IsDirty, LastSyncOffset}
+ end,
+ Entry1 = Entry #entry { current_offset = RealOff,
+ last_sync_offset = LastSyncOffset1,
+ is_dirty = IsDirty1 },
+ {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) }};
+ error ->
+ {{error, not_open}, State}
+ end.
+
+truncate(Ref, State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, Entry = #entry { hdl = Hdl, current_offset = Offset,
+ last_sync_offset = LastSyncOffset,
+ is_dirty = IsDirty }} ->
+ ok = case IsDirty of
+ true -> file:sync(Hdl);
+ false -> ok
+ end,
+ LastSyncOffset1 = lists:min([Offset, LastSyncOffset]),
+ ok = file:truncate(Hdl),
+ Entry1 = Entry #entry { last_sync_offset = LastSyncOffset1,
+ is_dirty = false },
+ {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) }};
+ error -> {{error, not_open}, State}
+ end.
+
+with_file_handle_at(Ref, Offset, Fun,
+ State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, Entry = #entry { hdl = Hdl, current_offset = OldOffset,
+ last_sync_offset = LastSyncOffset,
+ is_dirty = IsDirty }} ->
+ Offset1 =
+ case Offset of
+ cur -> OldOffset;
+ OldOffset -> OldOffset;
+ _ -> {ok, RealOff} = file:position(Hdl, Offset),
+ RealOff
+ end,
+ LastSyncOffset1 =
+ case IsDirty of
+ true -> ok = file:sync(Hdl),
+ lists:max([Offset1, OldOffset]);
+ false -> LastSyncOffset
+ end,
+ {Offset2, Result} = Fun(Hdl),
+ Entry1 = Entry #entry { current_offset = Offset2,
+ last_sync_offset = LastSyncOffset1,
+ is_dirty = true },
+ State1 = State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) },
+ {Result, State1};
+ error -> {{error, not_open}, State}
+ end.
+
+sync_to_offset(Ref, Offset, State = #hcstate { ref_entry = RefEntry }) ->
+ case dict:find(Ref, RefEntry) of
+ {ok, Entry = #entry { hdl = Hdl, last_sync_offset = LastSyncOffset,
+ current_offset = CurOffset, is_dirty = true }}
+ when (Offset =:= cur andalso CurOffset > LastSyncOffset)
+ orelse (Offset > LastSyncOffset) ->
+ ok = file:sync(Hdl),
+ LastSyncOffset1 =
+ case Offset of
+ cur -> lists:max([LastSyncOffset, CurOffset]);
+ _ -> lists:max([LastSyncOffset, CurOffset, Offset])
+ end,
+ Entry1 = Entry #entry { last_sync_offset = LastSyncOffset1,
+ is_dirty = false },
+ {ok, State #hcstate { ref_entry = dict:store(Ref, Entry1,
+ RefEntry) }};
+ error -> {{error, not_open}, State}
+ end.
+
+size_of_write_data(Data) ->
+ size_of_write_data(Data, 0).
+
+size_of_write_data([], Acc) ->
+ Acc;
+size_of_write_data([A|B], Acc) ->
+ size_of_write_data(B, size_of_write_data(A, Acc));
+size_of_write_data(Bin, Acc) when is_binary(Bin) ->
+ size(Bin) + Acc.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3471913f65..3a21c23625 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -108,11 +108,10 @@
-record(qistate,
{ dir,
- cur_seg_num,
- cur_seg_hdl,
+ seg_num_handles,
+ hc_state,
journal_ack_count,
journal_ack_dict,
- journal_handle,
seg_ack_counts
}).
@@ -124,14 +123,11 @@
-type(msg_id() :: binary()).
-type(seq_id() :: integer()).
--type(int_or_undef() :: integer() | 'undefined').
--type(io_dev_or_undef() :: io_device() | 'undefined').
-type(qistate() :: #qistate { dir :: file_path(),
- cur_seg_num :: int_or_undef(),
- cur_seg_hdl :: io_dev_or_undef(),
+ seg_num_handles :: dict(),
+ hc_state :: any(),
journal_ack_count :: integer(),
journal_ack_dict :: dict(),
- journal_handle :: io_device(),
seg_ack_counts :: dict()
}).
@@ -154,38 +150,30 @@
-endif.
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
init(Name) ->
+ HCState = horrendously_dumb_file_handle_cache:init(),
StrName = queue_name_to_dir_name(Name),
Dir = filename:join(queues_dir(), StrName),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- {TotalMsgCount, AckCounts, TransientADict} =
- find_ack_counts_and_deliver_transient_msgs(Dir),
- {TotalMsgCount1, AckCounts1} =
- scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict),
- {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME),
- [raw, binary, delayed_write, write, read]),
- {TotalMsgCount1, #qistate { dir = Dir,
- cur_seg_num = undefined,
- cur_seg_hdl = undefined,
- journal_ack_count = 0,
- journal_ack_dict = dict:new(),
- journal_handle = JournalHdl,
- seg_ack_counts = AckCounts1
- }}.
-
-terminate(State = #qistate { journal_handle = undefined }) ->
- State;
-terminate(State) ->
- State1 = #qistate { cur_seg_num = SegNum } = full_flush_journal(State),
- State2 = #qistate { journal_handle = JournalHdl } =
- close_file_handle_for_seg(SegNum, State1),
- ok = file:sync(JournalHdl),
- ok = file:close(JournalHdl),
- State2 #qistate { journal_handle = undefined }.
+ State = #qistate { dir = Dir,
+ seg_num_handles = dict:new(),
+ hc_state = HCState,
+ journal_ack_count = 0,
+ journal_ack_dict = dict:new(),
+ seg_ack_counts = dict:new() },
+ {TotalMsgCount, State1} = find_ack_counts_and_deliver_transient_msgs(State),
+ scatter_journal(TotalMsgCount, State1).
+
+terminate(State = #qistate { seg_num_handles = SegHdls }) ->
+ case 0 == dict:size(SegHdls) of
+ true -> State;
+ false -> close_all_handles(full_flush_journal(State))
+ end.
terminate_and_erase(State) ->
State1 = terminate(State),
@@ -196,35 +184,43 @@ write_published(MsgId, SeqId, IsPersistent, State)
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- {Hdl, State1} = get_file_handle_for_seg(SegNum, State),
- ok = file:write(Hdl,
- <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, MsgId/binary>>),
- State1.
+ {Hdl, State1} = get_seg_handle(SegNum, State),
+ {ok, HCState} = horrendously_dumb_file_handle_cache:write(
+ Hdl, eof,
+ <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, MsgId/binary>>,
+ State1 #qistate.hc_state),
+ State1 #qistate { hc_state = HCState }.
write_delivered(SeqId, State) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- {Hdl, State1} = get_file_handle_for_seg(SegNum, State),
- ok = file:write(Hdl,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>),
- State1.
-
-write_acks(SeqIds, State = #qistate { journal_handle = JournalHdl,
- journal_ack_dict = JAckDict,
+ {Hdl, State1} = get_seg_handle(SegNum, State),
+ {ok, HCState} = horrendously_dumb_file_handle_cache:write(
+ Hdl, eof,
+ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ State1 #qistate.hc_state),
+ State1 #qistate { hc_state = HCState }.
+
+write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict,
journal_ack_count = JAckCount }) ->
- {JAckDict1, JAckCount1} =
+ {Hdl, State1} = get_journal_handle(State),
+ {JAckDict1, JAckCount1, HCState} =
lists:foldl(
- fun (SeqId, {JAckDict2, JAckCount2}) ->
- ok = file:write(JournalHdl, <<SeqId:?SEQ_BITS>>),
- {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1}
- end, {JAckDict, JAckCount}, SeqIds),
- State1 = State #qistate { journal_ack_dict = JAckDict1,
- journal_ack_count = JAckCount1 },
+ fun (SeqId, {JAckDict2, JAckCount2, HCStateN}) ->
+ {ok, HCStateM} =
+ horrendously_dumb_file_handle_cache:write(
+ Hdl, eof, <<SeqId:?SEQ_BITS>>, HCStateN),
+ {add_ack_to_ack_dict(SeqId, JAckDict2),
+ JAckCount2 + 1, HCStateM}
+ end, {JAckDict, JAckCount, State1 #qistate.hc_state}, SeqIds),
+ State2 = State1 #qistate { journal_ack_dict = JAckDict1,
+ journal_ack_count = JAckCount1,
+ hc_state = HCState },
case JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT of
- true -> full_flush_journal(State1);
- false -> State1
+ true -> full_flush_journal(State2);
+ false -> State2
end.
full_flush_journal(State) ->
@@ -235,36 +231,32 @@ full_flush_journal(State) ->
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
{false, State};
-flush_journal(State = #qistate { journal_handle = JournalHdl,
- journal_ack_dict = JAckDict,
- journal_ack_count = JAckCount,
- seg_ack_counts = AckCounts,
- dir = Dir }) ->
+flush_journal(State = #qistate { journal_ack_dict = JAckDict,
+ journal_ack_count = JAckCount }) ->
[SegNum|_] = dict:fetch_keys(JAckDict),
Acks = dict:fetch(SegNum, JAckDict),
- SegPath = seg_num_to_path(Dir, SegNum),
- State1 = close_file_handle_for_seg(SegNum, State),
- AckCounts1 = append_acks_to_segment(SegPath, SegNum, AckCounts, Acks),
+ State1 = append_acks_to_segment(SegNum, Acks, State),
JAckCount1 = JAckCount - length(Acks),
State2 = State1 #qistate { journal_ack_dict = dict:erase(SegNum, JAckDict),
- journal_ack_count = JAckCount1,
- seg_ack_counts = AckCounts1 },
+ journal_ack_count = JAckCount1 },
if
JAckCount1 == 0 ->
- {ok, 0} = file:position(JournalHdl, 0),
- ok = file:truncate(JournalHdl),
- {false, State2};
+ {Hdl, State3 = #qistate { hc_state = HCState }} =
+ get_journal_handle(State2),
+ {ok, HCState1} =
+ horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState),
+ {ok, HCState2} =
+ horrendously_dumb_file_handle_cache:truncate(Hdl, HCState1),
+ {false, State3 #qistate { hc_state = HCState2 }};
JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
flush_journal(State2);
true ->
{true, State2}
end.
-read_segment_entries(InitSeqId, State =
- #qistate { dir = Dir, journal_ack_dict = JAckDict }) ->
+read_segment_entries(InitSeqId, State) ->
{SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- SegPath = seg_num_to_path(Dir, SegNum),
- {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, JAckDict),
+ {SDict, _AckCount, _HighRelSeq, State1} = load_segment(SegNum, State),
%% deliberately sort the list desc, because foldl will reverse it
RelSeqs = rev_sort(dict:fetch_keys(SDict)),
{lists:foldl(fun (RelSeq, Acc) ->
@@ -273,7 +265,7 @@ read_segment_entries(InitSeqId, State =
[ {MsgId, reconstruct_seq_id(SegNum, RelSeq),
IsPersistent, IsDelivered} | Acc]
end, [], RelSeqs),
- State}.
+ State1}.
next_segment_boundary(SeqId) ->
{SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -282,27 +274,31 @@ next_segment_boundary(SeqId) ->
segment_size() ->
?SEGMENT_ENTRIES_COUNT.
-find_lowest_seq_id_seg_and_next_seq_id(
- State = #qistate { dir = Dir, journal_ack_dict = JAckDict }) ->
- SegNumsPaths = all_segment_nums_paths(Dir),
+find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
+ SegNums = all_segment_nums(Dir),
%% We don't want the lowest seq_id, merely the seq_id of the start
%% of the lowest segment. That seq_id may not actually exist, but
%% that's fine. The important thing is that the segment exists and
%% the seq_id reported is on a segment boundary.
LowSeqIdSeg =
- case SegNumsPaths of
+ case SegNums of
[] -> 0;
- _ -> {SegNum1, _SegPath1} = lists:min(SegNumsPaths),
- reconstruct_seq_id(SegNum1, 0)
+ _ -> reconstruct_seq_id(lists:min(SegNums), 0)
end,
{NextSeqId, State1} =
- case SegNumsPaths of
+ case SegNums of
[] -> {0, State};
- _ -> {SegNum2, SegPath2} = lists:max(SegNumsPaths),
- State2 = close_file_handle_for_seg(SegNum2, State),
- {_SDict, _AckCount, HighRelSeq} =
- load_segment(SegNum2, SegPath2, JAckDict),
- {1 + reconstruct_seq_id(SegNum2, HighRelSeq), State2}
+ _ -> SegNum2 = lists:max(SegNums),
+ {SDict, AckCount, HighRelSeq, State2} =
+ load_segment(SegNum2, State),
+ NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq),
+ NextSeqId2 =
+ case 0 == AckCount andalso 0 == HighRelSeq andalso
+ 0 == dict:size(SDict) of
+ true -> NextSeqId1;
+ false -> NextSeqId1 + 1
+ end,
+ {NextSeqId2, State2}
end,
{LowSeqIdSeg, NextSeqId, State1}.
@@ -341,6 +337,7 @@ start_msg_store(DurableQueues) ->
end, TransientDirs),
ok.
+
%%----------------------------------------------------------------------------
%% Minor Helpers
%%----------------------------------------------------------------------------
@@ -357,28 +354,56 @@ queues_dir() ->
rev_sort(List) ->
lists:sort(fun (A, B) -> B < A end, List).
-close_file_handle_for_seg(_SegNum,
- State = #qistate { cur_seg_num = undefined }) ->
- State;
-close_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum,
- cur_seg_hdl = Hdl }) ->
- ok = file:sync(Hdl),
- ok = file:close(Hdl),
- State #qistate { cur_seg_num = undefined, cur_seg_hdl = undefined };
-close_file_handle_for_seg(_SegNum, State) ->
- State.
-
-get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum,
- cur_seg_hdl = Hdl }) ->
- {Hdl, State};
-get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) ->
- State1 = #qistate { dir = Dir } =
- close_file_handle_for_seg(CurSegNum, State),
- {ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum),
- [binary, raw, read, write,
- {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]),
- {ok, _} = file:position(Hdl, {eof, 0}),
- {Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}.
+get_journal_handle(State = #qistate { dir = Dir }) ->
+ Path = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
+ Mode = [raw, binary, delayed_write, write, read],
+ get_handle(journal, Path, Mode, State).
+
+get_seg_handle(SegNum, State = #qistate { dir = Dir }) ->
+ get_handle(SegNum, seg_num_to_path(Dir, SegNum),
+ [binary, raw, read, write,
+ {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}],
+ State).
+
+get_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) ->
+ State1 = #qistate { hc_state = HCState,
+ seg_num_handles = SegHdls1 } =
+ case dict:size(SegHdls) > 10 of
+ true -> close_all_handles(State);
+ false -> State
+ end,
+ case dict:find(Key, SegHdls1) of
+ {ok, Hdl} -> {Hdl, State1};
+ error ->
+ {{ok, Hdl}, HCState1} =
+ horrendously_dumb_file_handle_cache:open(Path, Mode, [],
+ HCState),
+ {Hdl, State1 #qistate {
+ hc_state = HCState1,
+ seg_num_handles = dict:store(Key, Hdl, SegHdls1) }}
+ end.
+
+close_handle(Key, State = #qistate { hc_state = HCState,
+ seg_num_handles = SegHdls }) ->
+ case dict:find(Key, SegHdls) of
+ {ok, Hdl} ->
+ {ok, HCState1} =
+ horrendously_dumb_file_handle_cache:close(Hdl, HCState),
+ State #qistate { hc_state = HCState1,
+ seg_num_handles = dict:erase(Key, SegHdls) };
+ error -> State
+ end.
+
+close_all_handles(State = #qistate { hc_state = HCState,
+ seg_num_handles = SegHdls }) ->
+ HCState1 =
+ dict:fold(
+ fun (_Key, Ref, HCStateN) ->
+ {ok, HCStateM} =
+ horrendously_dumb_file_handle_cache:close(Ref, HCStateN),
+ HCStateM
+ end, HCState, SegHdls),
+ State #qistate { hc_state = HCState1, seg_num_handles = dict:new() }.
bool_to_int(true ) -> 1;
bool_to_int(false) -> 0.
@@ -403,6 +428,7 @@ add_ack_to_ack_dict(SeqId, ADict) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
+
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
@@ -430,79 +456,88 @@ queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Entries],
{MsgId, bool_to_int(IsPersistent),
{Entries, N - 1, LowSeqIdSeg, State, QueueNames}}.
+
%%----------------------------------------------------------------------------
%% Startup Functions
%%----------------------------------------------------------------------------
-all_segment_nums_paths(Dir) ->
- [{list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
- SegName)), filename:join(Dir, SegName)}
+all_segment_nums(Dir) ->
+ [list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
|| SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)].
-find_ack_counts_and_deliver_transient_msgs(Dir) ->
- SegNumsPaths = all_segment_nums_paths(Dir),
- lists:foldl(
- fun ({SegNum, SegPath}, {TotalMsgCount, AckCounts, TransientADict}) ->
- {SDict, AckCount, _HighRelSeq} =
- load_segment(SegNum, SegPath, dict:new()),
- TransientMsgsAcks = deliver_transient(SegPath, SDict),
- %% ignore TransientMsgsAcks in AckCounts1 and
- %% TotalMsgCount1 because the TransientMsgsAcks fall
- %% through into scatter_journal at which point the
- %% AckCounts and TotalMsgCount will be correctly
- %% adjusted.
- TotalMsgCount1 = TotalMsgCount + dict:size(SDict),
- AckCounts1 = case AckCount of
- 0 -> AckCounts;
- N -> dict:store(SegNum, N, AckCounts)
- end,
- TransientADict1 =
- case TransientMsgsAcks of
- [] -> TransientADict;
- _ -> dict:store(SegNum, TransientMsgsAcks, TransientADict)
- end,
- {TotalMsgCount1, AckCounts1, TransientADict1}
- end, {0, dict:new(), dict:new()}, SegNumsPaths).
+find_ack_counts_and_deliver_transient_msgs(State = #qistate { dir = Dir }) ->
+ SegNums = all_segment_nums(Dir),
+ {TotalMsgCount, State1} =
+ lists:foldl(
+ fun (SegNum, {TotalMsgCount1, StateN}) ->
+ {SDict, AckCount, _HighRelSeq, StateM} =
+ load_segment(SegNum, StateN),
+ {TransientMsgsAcks, StateL =
+ #qistate { seg_ack_counts = AckCounts,
+ journal_ack_dict = JAckDict }} =
+ deliver_transient(SegNum, SDict, StateM),
+ %% ignore TransientMsgsAcks in AckCounts and
+ %% JAckDict1 because the TransientMsgsAcks fall
+ %% through into scatter_journal at which point the
+ %% AckCounts and TotalMsgCount will be correctly
+ %% adjusted.
+ TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict),
+ AckCounts1 = case AckCount of
+ 0 -> AckCounts;
+ N -> dict:store(SegNum, N, AckCounts)
+ end,
+ JAckDict1 =
+ case TransientMsgsAcks of
+ [] -> JAckDict;
+ _ -> dict:store(SegNum, TransientMsgsAcks, JAckDict)
+ end,
+ {TotalMsgCount2,
+ StateL #qistate { seg_ack_counts = AckCounts1,
+ journal_ack_dict = JAckDict1 }}
+ end, {0, State}, SegNums),
+ {TotalMsgCount, State1}.
-scatter_journal(Dir, TotalMsgCount, AckCounts, TransientADict) ->
+scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
- case file:open(JournalPath, [read, read_ahead, raw, binary]) of
- {error, enoent} -> {TotalMsgCount, AckCounts};
- {ok, Hdl} ->
- %% ADict may well contain duplicates. However, this is ok,
- %% due to the use of sets in replay_journal_acks_to_segment
- ADict = load_journal(Hdl, TransientADict),
- ok = file:close(Hdl),
- {TotalMsgCount1, AckCounts1, _Dir} =
- dict:fold(fun replay_journal_acks_to_segment/3,
- {TotalMsgCount, AckCounts, Dir}, ADict),
- ok = file:delete(JournalPath),
- {TotalMsgCount1, AckCounts1}
- end.
-
-load_journal(Hdl, ADict) ->
- case file:read(Hdl, ?SEQ_BYTES) of
- {ok, <<SeqId:?SEQ_BITS>>} ->
- load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict));
- _ErrOrEoF -> ADict
+ {Hdl, State1 = #qistate { hc_state = HCState,
+ journal_ack_dict = JAckDict }} =
+ get_journal_handle(State),
+ %% ADict may well contain duplicates. However, this is ok, due to
+ %% the use of sets in replay_journal_acks_to_segment
+ {ADict, HCState1} = load_journal(Hdl, JAckDict, HCState),
+ State2 = close_handle(journal, State1 #qistate { hc_state = HCState1 }),
+ {TotalMsgCount1, State3} =
+ dict:fold(fun replay_journal_acks_to_segment/3,
+ {TotalMsgCount, State2}, ADict),
+ ok = file:delete(JournalPath),
+ {TotalMsgCount1, State3 #qistate { journal_ack_dict = dict:new() }}.
+
+load_journal(Hdl, ADict, HCState) ->
+ case horrendously_dumb_file_handle_cache:read(
+ Hdl, cur, ?SEQ_BYTES, HCState) of
+ {{ok, <<SeqId:?SEQ_BITS>>}, HCState1} ->
+ load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict), HCState1);
+ {_ErrOrEoF, HCState1} -> {ADict, HCState1}
end.
replay_journal_acks_to_segment(_, [], Acc) ->
Acc;
-replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, AckCounts, Dir}) ->
- SegPath = seg_num_to_path(Dir, SegNum),
+replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) ->
%% supply empty dict so that we get all msgs in SDict that have
%% not been acked in the segment file itself
- {SDict, _AckCount, _HighRelSeq} = load_segment(SegNum, SegPath, dict:new()),
+ {SDict, _AckCount, _HighRelSeq, State1} =
+ load_segment(SegNum, State #qistate { journal_ack_dict = dict:new() }),
ValidRelSeqIds = dict:fetch_keys(SDict),
ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds),
sets:from_list(Acks))),
%% ValidAcks will not contain any duplicates at this point.
+ State2 =
+ State1 #qistate { journal_ack_dict = State #qistate.journal_ack_dict },
{TotalMsgCount - length(ValidAcks),
- append_acks_to_segment(SegPath, SegNum, AckCounts, ValidAcks), Dir}.
+ append_acks_to_segment(SegNum, ValidAcks, State2)}.
-deliver_transient(SegPath, SDict) ->
+deliver_transient(SegNum, SDict, State) ->
{AckMe, DeliverMe} =
dict:fold(
fun (_RelSeq, {_MsgId, _IsDelivered, true}, Acc) ->
@@ -512,27 +547,36 @@ deliver_transient(SegPath, SDict) ->
(RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) ->
{[RelSeq | AckMeAcc], DeliverMeAcc}
end, {[], []}, SDict),
- {ok, Hdl} = file:open(SegPath, [binary, raw, read, write,
- {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]),
- {ok, _} = file:position(Hdl, {eof, 0}),
- ok = file:write(Hdl, [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ]),
- ok = file:sync(Hdl),
- ok = file:close(Hdl),
- AckMe.
+ {Hdl, State1} = get_seg_handle(SegNum, State),
+ {ok, HCState} = horrendously_dumb_file_handle_cache:write(
+ Hdl, eof,
+ [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ],
+ State1 #qistate.hc_state),
+ {AckMe, State1 #qistate { hc_state = HCState }}.
+
%%----------------------------------------------------------------------------
%% Loading Segments
%%----------------------------------------------------------------------------
-load_segment(SegNum, SegPath, JAckDict) ->
- case file:open(SegPath, [raw, binary, read,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}]) of
- {error, enoent} -> {dict:new(), 0, 0};
- {ok, Hdl} ->
- {SDict, AckCount, HighRelSeq} =
- load_segment_entries(Hdl, dict:new(), 0, 0),
- ok = file:close(Hdl),
+load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
+ dir = Dir }) ->
+ SegmentExists = case dict:find(SegNum, SegHdls) of
+ {ok, _} -> true;
+ error -> filelib:is_file(seg_num_to_path(Dir, SegNum))
+ end,
+ case SegmentExists of
+ false -> {dict:new(), 0, 0, State};
+ true ->
+ {Hdl, State1 = #qistate { hc_state = HCState,
+ journal_ack_dict = JAckDict }} =
+ get_seg_handle(SegNum, State),
+ {ok, HCState1} =
+ horrendously_dumb_file_handle_cache:position(Hdl, 0, HCState),
+
+ {SDict, AckCount, HighRelSeq, HCState2} =
+ load_segment_entries(Hdl, dict:new(), 0, 0, HCState1),
RelSeqs = case dict:find(SegNum, JAckDict) of
{ok, RelSeqs1} -> RelSeqs1;
error -> []
@@ -541,30 +585,35 @@ load_segment(SegNum, SegPath, JAckDict) ->
lists:foldl(fun (RelSeq, {SDict2, AckCount2}) ->
{dict:erase(RelSeq, SDict2), AckCount2 + 1}
end, {SDict, AckCount}, RelSeqs),
- {SDict1, AckCount1, HighRelSeq}
+ {SDict1, AckCount1, HighRelSeq,
+ State1 #qistate { hc_state = HCState2 }}
end.
-load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) ->
- case file:read(Hdl, 1) of
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
- {ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
+load_segment_entries(Hdl, SDict, AckCount, HighRelSeq, HCState) ->
+ case horrendously_dumb_file_handle_cache:read(Hdl, cur, 1, HCState) of
+ {{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>}, HCState1} ->
+ {{ok, LSB}, HCState2} =
+ horrendously_dumb_file_handle_cache:read(
+ Hdl, cur, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1, HCState1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
{SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
- load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq);
- {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
+ load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq, HCState2);
+ {{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>}, HCState1} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} =
- file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
+ {{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>}, HCState2} =
+ horrendously_dumb_file_handle_cache:read(
+ Hdl, cur, ?PUBLISH_RECORD_LENGTH_BYTES - 1, HCState1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
load_segment_entries(
Hdl, dict:store(RelSeq, {MsgId, false,
1 == IsPersistentNum},
- SDict), AckCount, HighRelSeq1);
- _ErrOrEoF -> {SDict, AckCount, HighRelSeq}
+ SDict), AckCount, HighRelSeq1, HCState2);
+ {_ErrOrEoF, HCState1} ->
+ {SDict, AckCount, HighRelSeq, HCState1}
end.
deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
@@ -580,37 +629,42 @@ deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
%% Appending Acks to Segments
%%----------------------------------------------------------------------------
-append_acks_to_segment(SegPath, SegNum, AckCounts, Acks) ->
+append_acks_to_segment(SegNum, Acks,
+ State = #qistate { seg_ack_counts = AckCounts }) ->
AckCount = case dict:find(SegNum, AckCounts) of
{ok, AckCount1} -> AckCount1;
error -> 0
end,
- case append_acks_to_segment(SegPath, AckCount, Acks) of
- 0 -> AckCounts;
- ?SEGMENT_ENTRIES_COUNT -> dict:erase(SegNum, AckCounts);
- AckCount2 -> dict:store(SegNum, AckCount2, AckCounts)
+ case append_acks_to_segment(SegNum, AckCount, Acks, State) of
+ {0, State1} -> State1;
+ {?SEGMENT_ENTRIES_COUNT,
+ State1 = #qistate { seg_ack_counts = AckCounts1 }} ->
+ State1 #qistate { seg_ack_counts = dict:erase(SegNum, AckCounts1) };
+ {AckCount2, State1 = #qistate { seg_ack_counts = AckCounts1 }} ->
+ State1 #qistate { seg_ack_counts = dict:store(SegNum, AckCount2,
+ AckCounts1) }
end.
-append_acks_to_segment(SegPath, AckCount, Acks)
+append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir })
when length(Acks) + AckCount == ?SEGMENT_ENTRIES_COUNT ->
- ok = case file:delete(SegPath) of
+ State1 = close_handle(SegNum, State),
+ ok = case file:delete(seg_num_to_path(Dir, SegNum)) of
ok -> ok;
{error, enoent} -> ok
end,
- ?SEGMENT_ENTRIES_COUNT;
-append_acks_to_segment(SegPath, AckCount, Acks)
+ {?SEGMENT_ENTRIES_COUNT, State1};
+append_acks_to_segment(SegNum, AckCount, Acks, State)
when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT ->
- {ok, Hdl} = file:open(SegPath, [raw, binary, read, write,
- {delayed_write, ?SEGMENT_TOTAL_SIZE, 1000}]),
- {ok, _} = file:position(Hdl, {eof, 0}),
- AckCount1 =
+ {Hdl, State1} = get_seg_handle(SegNum, State),
+ {AckCount1, HCState} =
lists:foldl(
- fun (RelSeq, AckCount2) ->
- ok = file:write(Hdl,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>),
- AckCount2 + 1
- end, AckCount, Acks),
- ok = file:sync(Hdl),
- ok = file:close(Hdl),
- AckCount1.
+ fun (RelSeq, {AckCount2, HCStateN}) ->
+ {ok, HCStateM} =
+ horrendously_dumb_file_handle_cache:write(
+ Hdl, eof,
+ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>, HCStateN),
+ {AckCount2 + 1, HCStateM}
+ end, {AckCount, State1 #qistate.hc_state}, Acks),
+ {ok, HCState1} = horrendously_dumb_file_handle_cache:sync(Hdl, HCState),
+ {AckCount1, State1 #qistate { hc_state = HCState1 }}.